Skip to content

Commit

Permalink
Add /cluster/settings support and fix plugin status
Browse files Browse the repository at this point in the history
  • Loading branch information
phvalguima committed Feb 23, 2024
1 parent 9fdb09c commit 7114625
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 116 deletions.
21 changes: 21 additions & 0 deletions lib/charms/opensearch/v0/helper_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,27 @@ def suggest_roles(nodes: List[Node], planned_units: int) -> List[str]:

return base_roles + ["cluster_manager"]

@staticmethod
def get_cluster_settings(
opensearch: OpenSearchDistribution,
host: Optional[str] = None,
alt_hosts: Optional[List[str]] = None,
include_defaults: bool = False,
) -> Dict[str, any]:
"""Get the cluster settings."""
settings = opensearch.request(
"GET",
f"/_cluster/settings?flat_settings=true&include_defaults={str(include_defaults).lower()}",
host=host,
alt_hosts=alt_hosts,
)

result = {}
for el in ["defaults", "transient", "persistent"]:
if el in settings:
result = {**result, **settings[el]}
return result

@staticmethod
def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]:
"""Recompute the configuration of all the nodes (cluster set to auto-generate roles)."""
Expand Down
9 changes: 8 additions & 1 deletion lib/charms/opensearch/v0/opensearch_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def _query_backup_status(self, backup_id=None) -> BackupServiceState:
return BackupServiceState.RESPONSE_FAILED_NETWORK
return self.get_service_status(output)

def _on_s3_credentials_changed(self, event: EventBase) -> None:
def _on_s3_credentials_changed(self, event: EventBase) -> None: # noqa: C901
"""Calls the plugin manager config handler.
This method will iterate over the s3 relation and check:
Expand All @@ -489,8 +489,14 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None:
self.charm.status.set(MaintenanceStatus(BackupSetupStart))

try:
if not self.charm.plugin_manager.check_plugin_manager_ready():
raise OpenSearchNotFullyReadyError()

plugin = self.charm.plugin_manager.get_plugin(OpenSearchBackupPlugin)
if self.charm.plugin_manager.status(plugin) == PluginState.ENABLED:
# We need to explicitly disable the plugin before reconfiguration
# That happens because, differently from the actual configs, we cannot
# retrieve the key values and check if they changed.
self.charm.plugin_manager.apply_config(plugin.disable())
self.charm.plugin_manager.apply_config(plugin.config())
except OpenSearchError as e:
Expand All @@ -507,6 +513,7 @@ def _on_s3_credentials_changed(self, event: EventBase) -> None:
PluginState.ENABLED,
PluginState.WAITING_FOR_UPGRADE,
]:
logger.warning("_on_s3_credentials_changed: plugin is not enabled.")
event.defer()
return

Expand Down
19 changes: 7 additions & 12 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,22 +493,21 @@ def _on_config_changed(self, event: ConfigChangedEvent):
event.defer()
return

self.status.set(MaintenanceStatus(PluginConfigStart))
try:
self.status.set(MaintenanceStatus(PluginConfigStart))
if self.plugin_manager.run():
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
except OpenSearchNotFullyReadyError:
logger.warning("Plugin management: cluster not ready yet at config changed")
event.defer()
return
except OpenSearchPluginError:
self.status.set(BlockedStatus(PluginConfigChangeError))
self.status.clear(PluginConfigStart)
except (OpenSearchNotFullyReadyError, OpenSearchPluginError) as e:
if isinstance(e, OpenSearchNotFullyReadyError):
logger.warning("Plugin management: cluster not ready yet at config changed")
else:
self.status.set(BlockedStatus(PluginConfigChangeError))
event.defer()
return
self.status.clear(PluginConfigChangeError)
self.status.clear(PluginConfigStart)

def _on_set_password_action(self, event: ActionEvent):
"""Set new admin password from user input or generate if not passed."""
Expand Down Expand Up @@ -619,7 +618,6 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
event.defer()
self.defer_trigger_event.emit()
return

if not self._can_service_start():
self.peers_data.delete(Scope.UNIT, "starting")
event.defer()
Expand All @@ -632,9 +630,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
self.peers_data.delete(Scope.UNIT, "starting")
event.defer()
return

self.unit.status = WaitingStatus(WaitingToStart)

rel = self.model.get_relation(PeerRelationName)
for unit in rel.units.union({self.unit}):
if rel.data[unit].get("starting") == "True":
Expand All @@ -646,7 +642,6 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
try:
# Retrieve the nodes of the cluster, needed to configure this node
nodes = self._get_nodes(False)

# validate the roles prior to starting
self.opensearch_peer_cm.validate_roles(nodes, on_new_unit=True)

Expand Down
124 changes: 81 additions & 43 deletions lib/charms/opensearch/v0/opensearch_plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
config-changed, upgrade, s3-credentials-changed, etc.
"""

import functools
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from charms.opensearch.v0.helper_cluster import ClusterTopology
from charms.opensearch.v0.opensearch_backups import OpenSearchBackupPlugin
from charms.opensearch.v0.opensearch_exceptions import (
OpenSearchCmdError,
OpenSearchNotFullyReadyError,
Expand Down Expand Up @@ -78,6 +81,11 @@ def __init__(self, charm):
self._keystore = OpenSearchKeystore(self._charm)
self._event_scope = OpenSearchPluginEventScope.DEFAULT

@functools.cached_property
def cluster_config(self):
"""Returns the cluster configuration."""
return ClusterTopology.get_cluster_settings(self._charm.opensearch, include_defaults=True)

def set_event_scope(self, event_scope: OpenSearchPluginEventScope) -> None:
"""Sets the event scope of the plugin manager.
Expand Down Expand Up @@ -129,25 +137,31 @@ def _extra_conf(self, plugin_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
}
return {**self._charm_config, "opensearch-version": self._opensearch.version}

def check_plugin_manager_ready(self) -> bool:
"""Checks if the plugin manager is ready to run."""
return (
self._charm.opensearch.is_node_up()
and len(self._charm._get_nodes(True)) == self._charm.app.planned_units()
and self._charm.health.apply()
in [
HealthColors.GREEN,
HealthColors.YELLOW,
]
)

def run(self) -> bool:
"""Runs a check on each plugin: install, execute config changes or remove.
This method should be called at config-changed event. Returns if needed restart.
"""
if not self._charm.opensearch.is_started() or self._charm.health.apply() not in [
HealthColors.GREEN,
HealthColors.YELLOW,
]:
# If the health is not green, then raise a cluster-not-ready error
# The classes above should then defer their own events in waiting.
# Defer is important as next steps to configure plugins will involve
# calls to the APIs of the cluster.
logger.info("Cluster not ready, wait for the next event...")
if not self.check_plugin_manager_ready():
raise OpenSearchNotFullyReadyError()

err_msgs = []
restart_needed = False
for plugin in self.plugins:
logger.info(f"Checking plugin {plugin.name}...")
logger.debug(f"Status: {self.status(plugin)}")
# These are independent plugins.
# Any plugin that errors, if that is an OpenSearchPluginError, then
# it is an "expected" error, such as missing additional config; should
Expand All @@ -172,8 +186,8 @@ def run(self) -> bool:
# This is a more serious issue, as we are missing some input from
# the user. The charm should block.
err_msgs.append(str(e))
except OpenSearchPluginError as e:
logger.error(f"Plugin {plugin.name} failed: {str(e)}")
logger.debug(f"Finished Plugin {plugin.name} status: {self.status(plugin)}")
logger.info(f"Plugin check finished, restart needed: {restart_needed}")

if err_msgs:
raise OpenSearchPluginError("\n".join(err_msgs))
Expand Down Expand Up @@ -255,34 +269,62 @@ def _disable_if_needed(self, plugin: OpenSearchPlugin) -> bool:
except KeyError as e:
raise OpenSearchPluginMissingConfigError(plugin.name, configs=[f"{e}"])

def _compute_settings(
self, config: OpenSearchPluginConfig
) -> Tuple[Dict[str, str], Dict[str, str]]:
"""Returns the current and the new configuration."""
current_settings = self.cluster_config
to_remove = dict(
zip(config.config_entries_to_del, [None] * len(config.config_entries_to_del))
)
new_conf = dict(current_settings | to_remove | config.config_entries_to_add)

logger.debug(
"Difference between current and new configuration: \n"
+ str(
{
"in current but not in new_conf": {
k: v for k, v in current_settings.items() if k not in new_conf.keys()
},
"in new_conf but not in current": {
k: v for k, v in new_conf.items() if k not in current_settings.keys()
},
"in both but different values": {
k: v
for k, v in new_conf.items()
if k in current_settings.keys() and current_settings[k] != v
},
}
)
)
return current_settings, new_conf

def apply_config(self, config: OpenSearchPluginConfig) -> bool:
"""Runs the configuration changes as passed via OpenSearchPluginConfig.
For each: configuration and secret
1) Remove the entries to be deleted
2) Add entries, if available
For example:
KNN needs to:
1) Remove from configuration: {"knn.plugin.enabled": "True"}
2) Add to configuration: {"knn.plugin.enabled": "False"}
Returns True if a configuration change was performed.
"""
self._keystore.delete(config.secret_entries_to_del)
self._keystore.add(config.secret_entries_to_add)
# Add and remove configuration if applies
if config.secret_entries_to_del or config.secret_entries_to_add:
self._keystore.reload_keystore()

current_settings, new_conf = self._compute_settings(config)
if current_settings == new_conf:
# Nothing to do here
logger.info("apply_config: nothing to do, return")
return False

# Update the configuration
if config.config_entries_to_del:
self._opensearch_config.delete_plugin(config.config_entries_to_del)

if config.config_entries_to_add:
self._opensearch_config.add_plugin(config.config_entries_to_add)

if config.secret_entries_to_del or config.secret_entries_to_add:
self._keystore.reload_keystore()

# Return True if some configuration entries changed
return True if config.config_entries_to_add or config.config_entries_to_del else False
return True

def status(self, plugin: OpenSearchPlugin) -> PluginState:
"""Returns the status for a given plugin."""
Expand Down Expand Up @@ -320,32 +362,28 @@ def _user_requested_to_enable(self, plugin: OpenSearchPlugin) -> bool:
def _is_enabled(self, plugin: OpenSearchPlugin) -> bool:
"""Returns true if plugin is enabled.
The main question to answer is if we would remove the configuration from
opensearch.yaml and/or add some other configuration instead.
If yes, then we know that the service is enabled.
The main question to answer is if we would have it in the configuration
from cluster settings. If yes, then we know that the service is enabled.
The disable() method is used, as a given entry will be removed from opensearch.yml.
If disable() returns a list, then check if the keys in the stored_plugin_conf
matches the list values; otherwise, if a dict, then check if the keys and values match.
If they match in any of the cases above, then return True.
Check if the configuration from enable() is present or not.
"""
try:
plugin_conf = plugin.disable().config_entries_to_del
keystore_conf = plugin.disable().secret_entries_to_del
stored_plugin_conf = self._opensearch_config.get_plugin(plugin_conf)

if any(k not in self._keystore.list() for k in keystore_conf):
current_settings, new_conf = self._compute_settings(plugin.config())
if current_settings != new_conf:
return False

# Using sets to guarantee matches; stored_plugin_conf will be a dict
if isinstance(plugin_conf, list):
return set(plugin_conf) == set(stored_plugin_conf)
else: # plugin_conf is a dict
return plugin_conf == stored_plugin_conf
# Now, focus on the keystore part
keys_available = self._keystore.list()
keys_to_add = plugin.config().secret_entries_to_add
if any(k not in keys_available for k in keys_to_add):
return False
keys_to_del = plugin.config().secret_entries_to_del
if any(k in keys_available for k in keys_to_del):
return False
except (KeyError, OpenSearchPluginError) as e:
logger.warning(f"_is_enabled: error with {e}")
return False
return True

def _needs_upgrade(self, plugin: OpenSearchPlugin) -> bool:
"""Returns true if plugin needs upgrade."""
Expand Down
Loading

0 comments on commit 7114625

Please sign in to comment.