Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OpenSearch for locking with fallback to peer databag #211

Merged
merged 45 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0dc3a99
Use OpenSearch for locking with fallback to peer databag
carlcsaposs-canonical Apr 2, 2024
a97c3f4
TEMP disable lint/unit CI dep
carlcsaposs-canonical Apr 2, 2024
0578cd8
TEMP debug keyerror
carlcsaposs-canonical Apr 3, 2024
f3ad6d2
TEMP disable concurrency
carlcsaposs-canonical Apr 3, 2024
11b42d5
Add handling for missing peer relation
carlcsaposs-canonical Apr 3, 2024
a2ab9aa
remove retry comment
carlcsaposs-canonical Apr 3, 2024
7215316
todo comment
carlcsaposs-canonical Apr 3, 2024
596f3eb
replace if with assert
carlcsaposs-canonical Apr 3, 2024
8c10de4
Add refresh to delete
carlcsaposs-canonical Apr 3, 2024
bf5033b
Revert "TEMP debug keyerror"
carlcsaposs-canonical Apr 3, 2024
81b2521
Fix unit tests
carlcsaposs-canonical Apr 3, 2024
598a919
Fix unit tests
carlcsaposs-canonical Apr 3, 2024
f65b1d2
Fix backup unit tests
carlcsaposs-canonical Apr 3, 2024
23f6ade
Fix ml unit test
carlcsaposs-canonical Apr 3, 2024
02f7740
Fix lint
carlcsaposs-canonical Apr 3, 2024
b9b4804
Revert "TEMP disable lint/unit CI dep"
carlcsaposs-canonical Apr 3, 2024
f0b5b00
Revert "TEMP disable concurrency"
carlcsaposs-canonical Apr 3, 2024
54bc3d0
Emit start event from restart instead of calling handler
carlcsaposs-canonical Apr 3, 2024
e634e51
Add docstrings
carlcsaposs-canonical Apr 3, 2024
5b69b9b
TEMP disable concurrency
carlcsaposs-canonical Apr 3, 2024
46ea83a
trigger CI
carlcsaposs-canonical Apr 4, 2024
aa5082e
Revert "trigger CI"
carlcsaposs-canonical Apr 4, 2024
cf7920b
leftover from rebase
carlcsaposs-canonical Apr 5, 2024
6f0ae0d
remove todo comment
carlcsaposs-canonical Apr 5, 2024
3e10a0c
Log HTTP errors & retry (on next event) instead of re-raising
carlcsaposs-canonical Apr 5, 2024
8907d07
trigger ci
carlcsaposs-canonical Apr 5, 2024
dcaf11a
Revert "trigger ci"
carlcsaposs-canonical Apr 5, 2024
387c800
Revert "Log HTTP errors & retry (on next event) instead of re-raising"
carlcsaposs-canonical Apr 5, 2024
05fd3f5
Add debug logs
carlcsaposs-canonical Apr 5, 2024
bd167ec
debug 503 issue
carlcsaposs-canonical Apr 10, 2024
87499ef
Release lock if stop fails
carlcsaposs-canonical Apr 11, 2024
79b94b3
debug 503 issue
carlcsaposs-canonical Apr 11, 2024
6d96fcc
fix alt_hosts 503 errors
carlcsaposs-canonical Apr 11, 2024
070348f
Ignore peer databag lock if 1+ opensearch units online
carlcsaposs-canonical Apr 11, 2024
0e1b6b1
fix lint
carlcsaposs-canonical Apr 11, 2024
04eefc6
Log HTTP errors & retry (on next event) instead of re-raising
carlcsaposs-canonical Apr 11, 2024
d2bdd25
Remove role-rebalancing
carlcsaposs-canonical Apr 12, 2024
bfa7e26
Revert "debug 503 issue"
carlcsaposs-canonical Apr 12, 2024
907cf55
Revert "debug 503 issue"
carlcsaposs-canonical Apr 12, 2024
75bbb0a
Revert "TEMP disable concurrency"
carlcsaposs-canonical Apr 12, 2024
95ba5eb
Improve exception message
carlcsaposs-canonical Apr 12, 2024
ffc6cdc
Add status
carlcsaposs-canonical Apr 15, 2024
aa95f75
Rename lock class & add docstring
carlcsaposs-canonical Apr 15, 2024
7edf547
add type hint
carlcsaposs-canonical Apr 16, 2024
ce3b16a
Only acquire opensearch lock if 2+ units online (instead of 1+ units)
carlcsaposs-canonical Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions lib/charms/opensearch/v0/helper_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,8 @@ def suggest_roles(nodes: List[Node], planned_units: int) -> List[str]:
— odd: "all" the nodes are cm_eligible nodes.
— even: "all - 1" are cm_eligible and 1 data node.
"""
max_cms = ClusterTopology.max_cluster_manager_nodes(planned_units)

base_roles = ["data", "ingest", "ml", "coordinating_only"]
full_roles = base_roles + ["cluster_manager"]
nodes_by_roles = ClusterTopology.nodes_count_by_role(nodes)
if nodes_by_roles.get("cluster_manager", 0) == max_cms:
return base_roles
return full_roles
# TODO: remove in https://github.com/canonical/opensearch-operator/issues/230
return ["data", "ingest", "ml", "coordinating_only", "cluster_manager"]

@staticmethod
def get_cluster_settings(
Expand All @@ -74,6 +68,7 @@ def get_cluster_settings(

@staticmethod
def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]:
# TODO: remove in https://github.com/canonical/opensearch-operator/issues/230
"""Recompute the configuration of all the nodes (cluster set to auto-generate roles)."""
if not nodes:
return {}
Expand All @@ -86,19 +81,11 @@ def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]:
else:
# Leave node unchanged
nodes_by_name[node.name] = node
base_roles = ["data", "ingest", "ml", "coordinating_only"]
full_roles = base_roles + ["cluster_manager"]
highest_unit_number = max(node.unit_number for node in current_cluster_nodes)
for node in current_cluster_nodes:
# we do this in order to remove any non-default role / add any missing default role
if len(current_cluster_nodes) % 2 == 0 and node.unit_number == highest_unit_number:
roles = base_roles
else:
roles = full_roles

nodes_by_name[node.name] = Node(
name=node.name,
roles=roles,
# we do this in order to remove any non-default role / add any missing default role
roles=["data", "ingest", "ml", "coordinating_only", "cluster_manager"],
ip=node.ip,
app_name=node.app_name,
unit_number=node.unit_number,
Expand Down
110 changes: 61 additions & 49 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from charms.opensearch.v0.opensearch_fixes import OpenSearchFixes
from charms.opensearch.v0.opensearch_health import HealthColors, OpenSearchHealth
from charms.opensearch.v0.opensearch_internal_data import RelationDataStore, Scope
from charms.opensearch.v0.opensearch_locking import OpenSearchOpsLock
from charms.opensearch.v0.opensearch_locking import OpenSearchNodeLock
from charms.opensearch.v0.opensearch_nodes_exclusions import (
ALLOCS_TO_DELETE,
VOTING_TO_DELETE,
Expand All @@ -87,7 +87,6 @@
from charms.opensearch.v0.opensearch_secrets import OpenSearchSecrets
from charms.opensearch.v0.opensearch_tls import OpenSearchTLS
from charms.opensearch.v0.opensearch_users import OpenSearchUserManager
from charms.rolling_ops.v0.rollingops import RollingOpsManager
from charms.tls_certificates_interface.v3.tls_certificates import (
CertificateAvailableEvent,
)
Expand All @@ -105,7 +104,7 @@
StorageDetachingEvent,
UpdateStatusEvent,
)
from ops.framework import EventBase
from ops.framework import EventBase, EventSource
from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

Expand All @@ -127,9 +126,26 @@
logger = logging.getLogger(__name__)


class _StartOpenSearch(EventBase):
"""Attempt to acquire lock & start OpenSearch.

This event will be deferred until OpenSearch starts.
"""


class _RestartOpenSearch(EventBase):
"""Attempt to acquire lock & restart OpenSearch.

This event will be deferred until OpenSearch stops. Then, `_StartOpenSearch` will be emitted.
"""


class OpenSearchBaseCharm(CharmBase):
"""Base class for OpenSearch charms."""

_start_opensearch_event = EventSource(_StartOpenSearch)
_restart_opensearch_event = EventSource(_RestartOpenSearch)

def __init__(self, *args, distro: Type[OpenSearchDistribution] = None):
super().__init__(*args)

Expand All @@ -147,7 +163,7 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None):
self.tls = OpenSearchTLS(self, TLS_RELATION)
self.status = Status(self)
self.health = OpenSearchHealth(self)
self.ops_lock = OpenSearchOpsLock(self)
self.node_lock = OpenSearchNodeLock(self)
self.cos_integration = COSAgentProvider(
self,
relation_name=COSRelationName,
Expand All @@ -161,14 +177,14 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None):
self.plugin_manager = OpenSearchPluginManager(self)
self.backup = OpenSearchBackup(self)

self.service_manager = RollingOpsManager(
self, relation=SERVICE_MANAGER, callback=self._start_opensearch
)
self.user_manager = OpenSearchUserManager(self)
self.opensearch_provider = OpenSearchProvider(self)
self.peer_cluster_provider = OpenSearchPeerClusterProvider(self)
self.peer_cluster_requirer = OpenSearchPeerClusterRequirer(self)

self.framework.observe(self._start_opensearch_event, self._start_opensearch)
self.framework.observe(self._restart_opensearch_event, self._restart_opensearch)

self.framework.observe(self.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.update_status, self._on_update_status)
Expand Down Expand Up @@ -271,7 +287,7 @@ def _on_start(self, event: StartEvent):

# request the start of OpenSearch
self.status.set(WaitingStatus(RequestUnitServiceOps.format("start")))
self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch")
self._start_opensearch_event.emit()

def _apply_peer_cm_directives_and_check_if_can_start(self) -> bool:
"""Apply the directives computed by the opensearch peer cluster manager."""
Expand Down Expand Up @@ -404,7 +420,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent):
def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # noqa: C901
"""Triggered when removing unit, Prior to the storage being detached."""
# acquire lock to ensure only 1 unit removed at a time
self.ops_lock.acquire()
if not self.node_lock.acquired:
# Raise uncaught exception to prevent Juju from removing unit
raise Exception("Unable to acquire lock: Another unit is starting or stopping.")

# if the leader is departing, and this hook fails "leader elected" won"t trigger,
# so we want to re-balance the node roles from here
Expand Down Expand Up @@ -446,7 +464,7 @@ def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # no
raise OpenSearchHAError(ClusterHealthUnknown)
finally:
# release lock
self.ops_lock.release()
self.node_lock.release()

def _on_update_status(self, event: UpdateStatusEvent):
"""On update status event.
Expand Down Expand Up @@ -524,9 +542,7 @@ def _on_config_changed(self, event: ConfigChangedEvent): # noqa C901
if self.unit.is_leader():
self.status.set(MaintenanceStatus(PluginConfigCheck), app=True)
if self.plugin_manager.run():
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()
except (OpenSearchNotFullyReadyError, OpenSearchPluginError) as e:
if isinstance(e, OpenSearchNotFullyReadyError):
logger.warning("Plugin management: cluster not ready yet at config changed")
Expand Down Expand Up @@ -618,9 +634,7 @@ def on_tls_conf_set(

# In case of renewal of the unit transport layer cert - restart opensearch
if renewal and self.is_admin_user_configured() and self.is_tls_fully_configured():
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()

def on_tls_relation_broken(self, _: RelationBrokenEvent):
"""As long as all certificates are produced, we don't do anything."""
Expand Down Expand Up @@ -699,8 +713,12 @@ def _handle_change_to_main_orchestrator_if_needed(

self.tls.request_new_admin_certificate()

def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901
"""Start OpenSearch, with a generated or passed conf, if all resources configured."""
if not self.node_lock.acquired:
logger.debug("Lock to start opensearch not acquired. Will retry next event")
event.defer()
return
self.peers_data.delete(Scope.UNIT, "started")
if self.opensearch.is_started():
try:
Expand All @@ -710,25 +728,18 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
return

if not self._can_service_start():
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
event.defer()
return

if self.peers_data.get(Scope.UNIT, "starting", False) and self.opensearch.is_failed():
self.peers_data.delete(Scope.UNIT, "starting")
if self.opensearch.is_failed():
self.node_lock.release()
carlcsaposs-canonical marked this conversation as resolved.
Show resolved Hide resolved
self.status.set(BlockedStatus(ServiceStartError))
event.defer()
return

self.unit.status = WaitingStatus(WaitingToStart)

rel = self.model.get_relation(PeerRelationName)
for unit in rel.units.union({self.unit}):
if rel.data[unit].get("starting") == "True":
event.defer()
return

self.peers_data.put(Scope.UNIT, "starting", True)

try:
# Retrieve the nodes of the cluster, needed to configure this node
nodes = self._get_nodes(False)
Expand All @@ -739,12 +750,12 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
# Set the configuration of the node
self._set_node_conf(nodes)
except OpenSearchHttpError:
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
event.defer()
return
except OpenSearchProvidedRolesException as e:
logger.exception(e)
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
event.defer()
self.unit.status = BlockedStatus(str(e))
return
Expand All @@ -761,7 +772,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
event.defer()
except OpenSearchStartError as e:
logger.exception(e)
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
self.status.set(BlockedStatus(ServiceStartError))
event.defer()

Expand All @@ -788,8 +799,8 @@ def _post_start_init(self, event: EventBase):
# Remove the exclusions that could not be removed when no units were online
self.opensearch_exclusions.delete_current()

# Remove the 'starting' flag on the unit
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()

self.peers_data.put(Scope.UNIT, "started", True)

# apply post_start fixes to resolve start related upstream bugs
Expand Down Expand Up @@ -827,18 +838,23 @@ def _stop_opensearch(self) -> None:
# 3. Remove the exclusions
self.opensearch_exclusions.delete_current()

def _restart_opensearch(self, event: EventBase) -> None:
def _restart_opensearch(self, event: _RestartOpenSearch) -> None:
"""Restart OpenSearch if possible."""
if not self.peers_data.get(Scope.UNIT, "starting", False):
try:
self._stop_opensearch()
except OpenSearchStopError as e:
logger.exception(e)
event.defer()
self.status.set(WaitingStatus(ServiceIsStopping))
return
if not self.node_lock.acquired:
logger.debug("Lock to restart opensearch not acquired. Will retry next event")
event.defer()
return

self._start_opensearch(event)
try:
self._stop_opensearch()
except OpenSearchStopError as e:
logger.exception(e)
self.node_lock.release()
event.defer()
self.status.set(WaitingStatus(ServiceIsStopping))
return

self._start_opensearch_event.emit()

def _can_service_start(self) -> bool:
"""Return if the opensearch service can start."""
Expand Down Expand Up @@ -917,9 +933,7 @@ def _remove_data_role_from_dedicated_cm_if_needed( # noqa: C901
return False

self.status.set(WaitingStatus(WaitingToStart))
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()
return True

def _purge_users(self):
Expand Down Expand Up @@ -1148,9 +1162,7 @@ def _reconfigure_and_restart_unit_if_needed(self):
return

self.status.set(WaitingStatus(WaitingToStart))
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()

def _recompute_roles_if_needed(self, event: RelationChangedEvent):
"""Recompute node roles:self-healing that didn't trigger leader related event occurred."""
Expand Down
4 changes: 0 additions & 4 deletions lib/charms/opensearch/v0/opensearch_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ class OpenSearchNotFullyReadyError(OpenSearchError):
"""Exception thrown when a node is started but not full ready to take on requests."""


class OpenSearchOpsLockAlreadyAcquiredError(OpenSearchError):
"""Exception thrown when a node is started but not full ready to take on requests."""


class OpenSearchCmdError(OpenSearchError):
"""Exception thrown when an OpenSearch bin command fails."""

Expand Down
Loading
Loading