Skip to content

Commit

Permalink
Only acquire opensearch lock if 2+ units online (instead of 1+ units)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlcsaposs-canonical committed Apr 17, 2024
1 parent 7edf547 commit ce3b16a
Showing 1 changed file with 83 additions and 59 deletions.
142 changes: 83 additions & 59 deletions lib/charms/opensearch/v0/opensearch_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing

import ops
from charms.opensearch.v0.helper_cluster import ClusterTopology
from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -168,68 +169,91 @@ def acquired(self) -> bool: # noqa: C901
host = None
alt_hosts = [host for host in self._charm.alt_hosts if self._opensearch.is_node_up(host)]
if host or alt_hosts:
logger.debug("[Node lock] Using opensearch for lock")
# Acquire opensearch lock
# Create index if it doesn't exist
logger.debug("[Node lock] 1+ opensearch nodes online")
try:
self._opensearch.request(
"PUT",
endpoint=f"/{self._OPENSEARCH_INDEX}",
host=host,
alt_hosts=alt_hosts,
retries=3,
payload={"settings": {"index": {"auto_expand_replicas": "0-all"}}},
)
except OpenSearchHttpError as e:
if (
e.response_code == 400
and e.response_body.get("error", {}).get("type")
== "resource_already_exists_exception"
):
# Index already created
pass
else:
logger.exception("Error creating OpenSearch lock index")
return False
# Attempt to create document id 0
try:
self._opensearch.request(
"PUT",
endpoint=f"/{self._OPENSEARCH_INDEX}/_create/0?refresh=true",
host=host,
alt_hosts=alt_hosts,
retries=3,
payload={"unit-name": self._charm.unit.name},
online_nodes = len(
ClusterTopology.nodes(
self._opensearch, use_localhost=host is not None, hosts=alt_hosts
)
)
except OpenSearchHttpError as e:
if e.response_code == 409 and "document already exists" in e.response_body.get(
"error", {}
).get("reason", ""):
# Document already created
if (unit := self._unit_with_lock(host)) != self._charm.unit.name:
# Another unit has lock
# (Or document deleted after last request & before request in
# `self._lock_acquired()`)
logger.debug(
f"[Node lock] Not acquired. Unit with opensearch lock: {unit}"
)
except OpenSearchHttpError:
logger.exception("Error getting OpenSearch nodes")
return False
logger.debug(f"[Node lock] Opensearch {online_nodes=}")
assert online_nodes > 0
if online_nodes >= 2:
logger.debug("[Node lock] Attempting to acquire opensearch lock")
# Acquire opensearch lock
# Create index if it doesn't exist
try:
self._opensearch.request(
"PUT",
endpoint=f"/{self._OPENSEARCH_INDEX}",
host=host,
alt_hosts=alt_hosts,
retries=3,
payload={"settings": {"index": {"auto_expand_replicas": "0-all"}}},
)
except OpenSearchHttpError as e:
if (
e.response_code == 400
and e.response_body.get("error", {}).get("type")
== "resource_already_exists_exception"
):
# Index already created
pass
else:
logger.exception("Error creating OpenSearch lock index")
return False
else:
logger.exception("Error creating OpenSearch lock document")
return False
# Lock acquired
# Release peer databag lock, if any
logger.debug("[Node lock] Acquired via opensearch")
self._peer.release()
logger.debug("[Node lock] Released redundant peer lock (if held)")
return True
else:
logger.debug("[Node lock] Using peer databag for lock")
# Request peer databag lock
# If return value is True:
# - Lock granted in previous Juju event
# - OR, unit is leader & lock granted in this Juju event
return self._peer.acquired
# Attempt to create document id 0
try:
self._opensearch.request(
"PUT",
endpoint=f"/{self._OPENSEARCH_INDEX}/_create/0?refresh=true",
host=host,
alt_hosts=alt_hosts,
retries=3,
payload={"unit-name": self._charm.unit.name},
)
except OpenSearchHttpError as e:
if e.response_code == 409 and "document already exists" in e.response_body.get(
"error", {}
).get("reason", ""):
# Document already created
pass
else:
logger.exception("Error creating OpenSearch lock document")
return False
unit = self._unit_with_lock(host)
if unit == self._charm.unit.name:
# Lock acquired
# Release peer databag lock, if any
logger.debug("[Node lock] Acquired via opensearch")
self._peer.release()
logger.debug("[Node lock] Released redundant peer lock (if held)")
return True
if unit or online_nodes >= 2:
# Another unit has lock
# (Or document deleted after request to create document & before request in
# `self._unit_with_lock()`)
logger.debug(f"[Node lock] Not acquired. Unit with opensearch lock: {unit}")
return False
# If online_nodes == 1, we should acquire the lock via the peer databag.
# If we acquired the lock via OpenSearch and this unit was stopping, we would be unable
# to release the OpenSearch lock. For example, when scaling to 0.
# Then, when 1+ OpenSearch nodes are online, a unit that no longer exists could hold
# the lock.
# Note: if online_nodes > 1, this situation is still possible (e.g. if this unit was
# stopping and another unit went offline simultaneously)—but it's an edge case we don't
# support (to reduce complexity & improve robustness in other cases).
# If online_nodes > 1, we should re-attempt to acquire the OpenSearch lock.
logger.debug("[Node lock] No unit has opensearch lock")
logger.debug("[Node lock] Using peer databag for lock")
# Request peer databag lock
# If return value is True:
# - Lock granted in previous Juju event
# - OR, unit is leader & lock granted in this Juju event
return self._peer.acquired

def release(self):
"""Release lock.
Expand Down

0 comments on commit ce3b16a

Please sign in to comment.