Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5947] Scale up: add new units as etcd cluster members #15

Merged
merged 30 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6535c03
add required server and cluster properties
reneradoi Dec 11, 2024
4f56089
WIP: start only juju leader, add all other units to etcd cluster subs…
reneradoi Dec 13, 2024
173cb14
add new units as learners in etcd cluster, promote learners to full-v…
reneradoi Dec 16, 2024
075ff38
naming
reneradoi Jan 6, 2025
c7d50a1
adjust unit tests, add logging
reneradoi Jan 6, 2025
065efaf
add basic integration testing for scale up
reneradoi Jan 6, 2025
e8eb8a9
integration test: check cluster formation after scale up
reneradoi Jan 6, 2025
c63002d
WIP: add continuous writes
reneradoi Jan 6, 2025
1175a20
move logic for starting an etcd member to cluster-manager
reneradoi Jan 7, 2025
091c830
fix continuous writes and ordering of test steps in integration test
reneradoi Jan 7, 2025
5709240
make continuous writes reusable if tests are run on an already existi…
reneradoi Jan 7, 2025
c78c4ab
add required server and cluster properties
reneradoi Dec 11, 2024
cfe4473
WIP: start only juju leader, add all other units to etcd cluster subs…
reneradoi Dec 13, 2024
efd2d9f
add new units as learners in etcd cluster, promote learners to full-v…
reneradoi Dec 16, 2024
03aa105
naming
reneradoi Jan 6, 2025
d498d19
adjust unit tests, add logging
reneradoi Jan 6, 2025
f559b7f
add basic integration testing for scale up
reneradoi Jan 6, 2025
ff943bb
integration test: check cluster formation after scale up
reneradoi Jan 6, 2025
248f2b9
WIP: add continuous writes
reneradoi Jan 6, 2025
3caaab7
move logic for starting an etcd member to cluster-manager
reneradoi Jan 7, 2025
e9dae29
fix continuous writes and ordering of test steps in integration test
reneradoi Jan 7, 2025
8daf356
make continuous writes reusable if tests are run on an already existi…
reneradoi Jan 7, 2025
61f6003
Merge remote-tracking branch 'origin/add-new-units-as-cluster-members…
reneradoi Jan 8, 2025
9dd0f25
re-add after being lost in rebasing: make continuous writes reusable …
reneradoi Jan 8, 2025
4a3dfa2
fix linting (also got lost in rebasing)
reneradoi Jan 8, 2025
f8e81de
reduce waiting time in integration test to 10 seconds
reneradoi Jan 10, 2025
d8a992a
address PR feedback
reneradoi Jan 13, 2025
a4eb570
fix integration test helper (return int instead str to allow for comp…
reneradoi Jan 13, 2025
fff1e3d
run the `member add` command in `etcdctl` in json format to allow for…
reneradoi Jan 14, 2025
7f3f848
Merge remote-tracking branch 'refs/remotes/origin/main' into add-new-…
reneradoi Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, *args):
self.state = ClusterState(self, substrate=SUBSTRATE)

# --- MANAGERS ---
self.cluster_manager = ClusterManager(self.state)
self.cluster_manager = ClusterManager(state=self.state, workload=self.workload)
self.config_manager = ConfigManager(
state=self.state, workload=self.workload, config=self.config
)
Expand Down
76 changes: 74 additions & 2 deletions src/common/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
import json
import logging
import subprocess
from typing import Tuple

from common.exceptions import EtcdAuthNotEnabledError, EtcdUserManagementError
from common.exceptions import (
EtcdAuthNotEnabledError,
EtcdClusterManagementError,
EtcdUserManagementError,
)
from literals import INTERNAL_USER, SNAP_NAME

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,7 +44,7 @@ def get_endpoint_status(self) -> dict:
try:
endpoint_status = json.loads(result)[0]
except json.JSONDecodeError:
pass
raise

return endpoint_status

Expand Down Expand Up @@ -83,6 +88,61 @@ def enable_auth(self) -> None:
else:
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
raise EtcdAuthNotEnabledError("Failed to enable authentication in etcd.")

def add_member_as_learner(self, member_name: str, peer_url: str) -> Tuple[str, str]:
"""Add a new member as learner to the etcd-cluster.

Returns:
- The updated `ETCD_INITIAL_CLUSTER` to be used as config `initial-cluster` for
starting the new cluster member
- The `MEMBER_ID` of the newly added member, to be used for promoting the new member
as full-voting after starting up
"""
if result := self._run_etcdctl(
command="member",
subcommand="add",
endpoints=self.client_url,
auth_username=self.user,
auth_password=self.password,
member=member_name,
peer_url=peer_url,
learner=True,
output_format="json",
):
try:
cluster = json.loads(result)
cluster_members = ""
member_id = ""
for member in cluster["members"]:
if member.get("name"):
cluster_members += f"{member['name']}={member['peerURLs'][0]},"
if member["peerURLs"][0] == peer_url:
# the member ID is returned as int, but needs to be processed as hex
# e.g. ID=4477466968462020105 needs to be stored as 3e23287c34b94e09
member_id = f"{member['ID']:0>2x}"
# for the newly added member, the member name will not be included in the response
# we have to append it separately
cluster_members += f"{member_name}={peer_url}"
except (json.JSONDecodeError, KeyError):
raise
logger.debug(f"Updated cluster members: {cluster_members}, new member: {member_id}")
return cluster_members, str(member_id)
else:
raise EtcdClusterManagementError(f"Failed to add {member_name} as learner.")

def promote_member(self, member_id: str) -> None:
"""Promote a learner-member to full-voting member in the etcd-cluster."""
if result := self._run_etcdctl(
command="member",
subcommand="promote",
endpoints=self.client_url,
auth_username=self.user,
auth_password=self.password,
member=member_id,
):
logger.debug(result)
else:
raise EtcdClusterManagementError(f"Failed to promote member {member_id}.")

def _run_etcdctl( # noqa: C901
self,
command: str,
Expand All @@ -95,6 +155,9 @@ def _run_etcdctl( # noqa: C901
auth_password: str | None = None,
user: str | None = None,
user_password: str | None = None,
member: str | None = None,
peer_url: str | None = None,
learner: bool = False,
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
output_format: str = "simple",
use_input: str | None = None,
) -> str | None:
Expand All @@ -112,6 +175,9 @@ def _run_etcdctl( # noqa: C901
auth_password: password used for authentication
user: username to be added or updated in etcd
user_password: password to be set for the user that is added to etcd
member: member name or id, required for commands `member add/update/promote/remove`
peer_url: url of a member to be used for cluster-internal communication
learner: flag for adding a new cluster member as not-voting member
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
output_format: set the output format (fields, json, protobuf, simple, table)
use_input: supply text input to be passed to the `etcdctl` command (e.g. for
non-interactive password change)
Expand All @@ -138,6 +204,12 @@ def _run_etcdctl( # noqa: C901
args.append(f"--user={auth_username}")
if auth_password:
args.append(f"--password={auth_password}")
if member:
args.append(member)
if peer_url:
args.append(f"--peer-urls={peer_url}")
if learner:
args.append("--learner=True")
if output_format:
args.append(f"-w={output_format}")
if use_input:
Expand Down
4 changes: 4 additions & 0 deletions src/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ class EtcdUserManagementError(Exception):

class EtcdAuthNotEnabledError(Exception):
"""Custom Exception if authentication could not be enabled in the etcd cluster."""


class EtcdClusterManagementError(Exception):
"""Custom Exception if cluster management operation fails."""
2 changes: 2 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def cluster(self) -> EtcdCluster:
def servers(self) -> Set[EtcdServer]:
"""Get all servers/units in the current peer relation, including this unit itself.

Note: This is not to be confused with the list of cluster members.

Returns:
Set of EtcdServers with their unit data.
"""
Expand Down
42 changes: 38 additions & 4 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def update(self, items: dict[str, str]) -> None:
self.relation_data.update(update_content)

for field in delete_fields:
self.relation_data.pop(field, None)
# use del instead of pop here because of error with dataplatform-libs
try:
del self.relation_data[field]
except KeyError:
pass


class EtcdServer(RelationState):
Expand Down Expand Up @@ -95,6 +99,11 @@ def client_url(self) -> str:
"""The client connection endpoint for the etcd server."""
return f"http://{self.ip}:{CLIENT_PORT}"

@property
def member_endpoint(self) -> str:
"""Concatenate member_name and peer_url."""
return f"{self.member_name}={self.peer_url}"


class EtcdCluster(RelationState):
"""State/Relation data collection for the etcd application."""
Expand All @@ -110,9 +119,9 @@ def __init__(
self.app = component

@property
def initial_cluster_state(self) -> str:
"""The initial cluster state ('new' or 'existing') of the etcd cluster."""
return self.relation_data.get("initial_cluster_state", "")
def cluster_state(self) -> str:
"""The cluster state ('new' or 'existing') of the etcd cluster."""
return self.relation_data.get("cluster_state", "")
reneradoi marked this conversation as resolved.
Show resolved Hide resolved

@property
def internal_user_credentials(self) -> dict[str, str]:
Expand All @@ -126,3 +135,28 @@ def internal_user_credentials(self) -> dict[str, str]:
def auth_enabled(self) -> bool:
"""Flag to check if authentication is already enabled in the Cluster."""
return self.relation_data.get("authentication", "") == "enabled"

@property
def cluster_members(self) -> str:
"""Get the list of current members added to the etcd cluster.

This string is the output of the `etcdctl member add` command issued by the juju leader
when a new unit joins and is added as cluster member. This string needs to be provided
as an argument `--initial-cluster` when starting the workload on the newly added unit.

This data is added to the peer cluster relation app databag when the first unit initializes
the cluster on startup after deployment.
"""
return self.relation_data.get("cluster_members", "")

@property
def learning_member(self) -> str:
"""Get the current learning member.

New cluster members are added to the etcd cluster as so-called learning members. That means
they are not participating in raft leader election because they do not yet have up-to-data
data. When added as cluster members with the `add member` command, the juju leader will
put the unit's `member_id` here. After promoting to full voting member, the juju leader
will unset the `member_id` here.
"""
return self.relation_data.get("learning_member", "")
86 changes: 47 additions & 39 deletions src/events/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

from common.exceptions import (
EtcdAuthNotEnabledError,
EtcdClusterManagementError,
EtcdUserManagementError,
RaftLeaderNotFoundError,
)
from common.secrets import get_secret_from_id
from literals import INTERNAL_USER, INTERNAL_USER_PASSWORD_CONFIG, PEER_RELATION, Status
Expand All @@ -45,16 +45,16 @@ def __init__(self, charm: "EtcdOperatorCharm"):
self.framework.observe(self.charm.on.start, self._on_start)
self.framework.observe(self.charm.on.config_changed, self._on_config_changed)
self.framework.observe(
self.charm.on[PEER_RELATION].relation_created, self._on_cluster_relation_created
self.charm.on[PEER_RELATION].relation_created, self._on_peer_relation_created
)
self.framework.observe(
self.charm.on[PEER_RELATION].relation_joined, self._on_cluster_relation_joined
self.charm.on[PEER_RELATION].relation_joined, self._on_peer_relation_joined
)
self.framework.observe(
self.charm.on[PEER_RELATION].relation_changed, self._on_cluster_relation_changed
self.charm.on[PEER_RELATION].relation_changed, self._on_peer_relation_changed
)
self.framework.observe(
self.charm.on[PEER_RELATION].relation_departed, self._on_cluster_relation_departed
self.charm.on[PEER_RELATION].relation_departed, self._on_peer_relation_departed
)
self.framework.observe(self.charm.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.charm.on.update_status, self._on_update_status)
Expand All @@ -68,29 +68,33 @@ def _on_install(self, event: ops.InstallEvent) -> None:

def _on_start(self, event: ops.StartEvent) -> None:
"""Handle start event."""
# Make sure all planned units have joined the peer relation before starting the cluster
if (
not self.charm.state.peer_relation
or len(self.charm.state.peer_relation.units) + 1 < self.charm.app.planned_units()
self.charm.config_manager.set_config_properties()

if self.charm.unit.is_leader() and not self.charm.state.cluster.cluster_state:
# this is the very first cluster start, this unit starts without being added as member
# all subsequent units will have to be added as member before starting the workload
self.charm.cluster_manager.start_member()

if not self.charm.state.cluster.auth_enabled:
try:
self.charm.cluster_manager.enable_authentication()
self.charm.state.cluster.update({"authentication": "enabled"})
except (EtcdAuthNotEnabledError, EtcdUserManagementError) as e:
logger.error(e)
self.charm.set_status(Status.AUTHENTICATION_NOT_ENABLED)
return
elif (
self.charm.state.unit_server.member_endpoint
in self.charm.state.cluster.cluster_members
):
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Deferring start because not all units joined peer-relation.")
self.charm.set_status(Status.NO_PEER_RELATION)
# this unit has been added to the etcd cluster
self.charm.cluster_manager.start_member()
else:
# this unit that has not yet been added to the cluster
# wait for leader to process `relation_joined` event and add the member to the cluster
event.defer()
return

self.charm.config_manager.set_config_properties()

self.charm.workload.start()

if self.charm.unit.is_leader() and not self.charm.state.cluster.auth_enabled:
try:
self.charm.cluster_manager.enable_authentication()
self.charm.state.cluster.update({"authentication": "enabled"})
except (EtcdAuthNotEnabledError, EtcdUserManagementError) as e:
logger.error(e)
self.charm.set_status(Status.AUTHENTICATION_NOT_ENABLED)
return

if self.charm.workload.alive():
self.charm.set_status(Status.ACTIVE)
else:
Expand All @@ -104,30 +108,34 @@ def _on_config_changed(self, event: ops.ConfigChangedEvent) -> None:
if admin_secret_id := self.charm.config.get(INTERNAL_USER_PASSWORD_CONFIG):
self.update_admin_password(admin_secret_id)

def _on_cluster_relation_created(self, event: RelationCreatedEvent) -> None:
def _on_peer_relation_created(self, event: RelationCreatedEvent) -> None:
"""Handle event received by a new unit when joining the cluster relation."""
self.charm.state.unit_server.update(self.charm.cluster_manager.get_host_mapping())
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
if self.charm.unit.is_leader():
self.charm.state.cluster.update({"initial-cluster-state": "new"})

def _on_cluster_relation_changed(self, event: RelationChangedEvent) -> None:
def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handle all events related to the cluster-peer relation."""
pass
if self.charm.unit.is_leader() and self.charm.state.cluster.learning_member:
try:
# this will promote any learner, not only the unit that updated its relation data
self.charm.cluster_manager.promote_learning_member()
except EtcdClusterManagementError as e:
logger.warning(e)
event.defer()
return

def _on_cluster_relation_departed(self, event: RelationDepartedEvent) -> None:
def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""Handle event received by a unit leaves the cluster relation."""
pass

def _on_cluster_relation_joined(self, event: RelationJoinedEvent) -> None:
def _on_peer_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Handle event received by all units when a new unit joins the cluster relation."""
# Todo: remove this test at some point, this is just for showcasing that it works :)
# We will need to perform any HA-related action against the raft leader
# e.g. add members, trigger leader election, log compaction, etc.
try:
raft_leader = self.charm.cluster_manager.get_leader()
logger.info(f"Raft leader: {raft_leader}")
except RaftLeaderNotFoundError as e:
logger.warning(e)
if self.charm.unit.is_leader():
try:
self.charm.cluster_manager.add_member(event.unit.name)
except (EtcdClusterManagementError, KeyError) as e:
logger.warning(e)
event.defer()
return

def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
"""Handle all events in the 'cluster' peer relation."""
Expand Down
7 changes: 7 additions & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class StatusLevel:
log_level: DebugLevel


class EtcdClusterState(Enum):
"""Enum for Cluster state in etcd."""

EXISTING = "existing"
NEW = "new"


class Status(Enum):
"""Collection of possible statuses for the charm."""

Expand Down
Loading
Loading