Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5947] Scale up: add new units as etcd cluster members #15

Merged
merged 30 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6535c03
add required server and cluster properties
reneradoi Dec 11, 2024
4f56089
WIP: start only juju leader, add all other units to etcd cluster subs…
reneradoi Dec 13, 2024
173cb14
add new units as learners in etcd cluster, promote learners to full-v…
reneradoi Dec 16, 2024
075ff38
naming
reneradoi Jan 6, 2025
c7d50a1
adjust unit tests, add logging
reneradoi Jan 6, 2025
065efaf
add basic integration testing for scale up
reneradoi Jan 6, 2025
e8eb8a9
integration test: check cluster formation after scale up
reneradoi Jan 6, 2025
c63002d
WIP: add continuous writes
reneradoi Jan 6, 2025
1175a20
move logic for starting an etcd member to cluster-manager
reneradoi Jan 7, 2025
091c830
fix continuous writes and ordering of test steps in integration test
reneradoi Jan 7, 2025
5709240
make continuous writes reusable if tests are run on an already existi…
reneradoi Jan 7, 2025
c78c4ab
add required server and cluster properties
reneradoi Dec 11, 2024
cfe4473
WIP: start only juju leader, add all other units to etcd cluster subs…
reneradoi Dec 13, 2024
efd2d9f
add new units as learners in etcd cluster, promote learners to full-v…
reneradoi Dec 16, 2024
03aa105
naming
reneradoi Jan 6, 2025
d498d19
adjust unit tests, add logging
reneradoi Jan 6, 2025
f559b7f
add basic integration testing for scale up
reneradoi Jan 6, 2025
ff943bb
integration test: check cluster formation after scale up
reneradoi Jan 6, 2025
248f2b9
WIP: add continuous writes
reneradoi Jan 6, 2025
3caaab7
move logic for starting an etcd member to cluster-manager
reneradoi Jan 7, 2025
e9dae29
fix continuous writes and ordering of test steps in integration test
reneradoi Jan 7, 2025
8daf356
make continuous writes reusable if tests are run on an already existi…
reneradoi Jan 7, 2025
61f6003
Merge remote-tracking branch 'origin/add-new-units-as-cluster-members…
reneradoi Jan 8, 2025
9dd0f25
re-add after being lost in rebasing: make continuous writes reusable …
reneradoi Jan 8, 2025
4a3dfa2
fix linting (also got lost in rebasing)
reneradoi Jan 8, 2025
f8e81de
reduce waiting time in integration test to 10 seconds
reneradoi Jan 10, 2025
d8a992a
address PR feedback
reneradoi Jan 13, 2025
a4eb570
fix integration test helper (return int instead str to allow for comp…
reneradoi Jan 13, 2025
fff1e3d
run the `member add` command in `etcdctl` in json format to allow for…
reneradoi Jan 14, 2025
7f3f848
Merge remote-tracking branch 'refs/remotes/origin/main' into add-new-…
reneradoi Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, *args):
self.state = ClusterState(self, substrate=SUBSTRATE)

# --- MANAGERS ---
self.cluster_manager = ClusterManager(self.state)
self.cluster_manager = ClusterManager(state=self.state, workload=self.workload)
self.config_manager = ConfigManager(
state=self.state, workload=self.workload, config=self.config
)
Expand Down
67 changes: 66 additions & 1 deletion src/common/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
import json
import logging
import subprocess
from typing import Tuple

from common.exceptions import EtcdAuthNotEnabledError, EtcdUserManagementError
from common.exceptions import (
EtcdAuthNotEnabledError,
EtcdClusterManagementError,
EtcdUserManagementError,
)
from literals import INTERNAL_USER, SNAP_NAME

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,6 +88,54 @@ def enable_auth(self) -> None:
else:
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
raise EtcdAuthNotEnabledError("Failed to enable authentication in etcd.")

def add_member_as_learner(self, member_name: str, peer_url: str) -> Tuple[str, str]:
"""Add a new member as learner to the etcd-cluster.

Returns:
- The updated `ETCD_INITIAL_CLUSTER` to be used as config `initial-cluster` for
starting the new cluster member
- The `MEMBER_ID` of the newly added member, to be used for promoting the new member
as full-voting after starting up
"""
if result := self._run_etcdctl(
command="member",
subcommand="add",
endpoints=self.client_url,
auth_username=self.user,
auth_password=self.password,
member=member_name,
peer_url=peer_url,
learner=True,
):
# the subcommand will return the following output:
# Member 3e23287c34b94e09 added to cluster c4d701b62779596b
#
# ETCD_NAME="etcd8"
# ETCD_INITIAL_CLUSTER="etcd8=http://10.86.196.119:2380,etcd7=http://10.86.196.232:2380"
# ETCD_INITIAL_ADVERTISE_PEER_URLS="http://10.86.196.119:2380"
# ETCD_INITIAL_CLUSTER_STATE="existing"
#
# we need to parse this for the `ETCD_INITIAL_CLUSTER` configuration
result = result.split("\n")
logger.debug(f"Updated cluster members: {result[0]}")
return result[3].split("ETCD_INITIAL_CLUSTER=")[1].strip('"'), result[0].split()[1]
Mehdi-Bendriss marked this conversation as resolved.
Show resolved Hide resolved
else:
raise EtcdClusterManagementError(f"Failed to add {member_name} as learner.")

def promote_member(self, member_id: str) -> None:
"""Promote a learner-member to full-voting member in the etcd-cluster."""
if result := self._run_etcdctl(
command="member",
subcommand="promote",
endpoints=self.client_url,
auth_username=self.user,
auth_password=self.password,
member=member_id,
):
logger.debug(result)
else:
raise EtcdClusterManagementError(f"Failed to promote member {member_id}.")

def _run_etcdctl( # noqa: C901
self,
command: str,
Expand All @@ -95,6 +148,9 @@ def _run_etcdctl( # noqa: C901
auth_password: str | None = None,
user: str | None = None,
user_password: str | None = None,
member: str | None = None,
peer_url: str | None = None,
learner: bool = False,
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
output_format: str = "simple",
use_input: str | None = None,
) -> str | None:
Expand All @@ -112,6 +168,9 @@ def _run_etcdctl( # noqa: C901
auth_password: password used for authentication
user: username to be added or updated in etcd
user_password: password to be set for the user that is added to etcd
member: member name or id, required for commands `member add/update/promote/remove`
peer_url: url of a member to be used for cluster-internal communication
learner: flag for adding a new cluster member as not-voting member
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
output_format: set the output format (fields, json, protobuf, simple, table)
use_input: supply text input to be passed to the `etcdctl` command (e.g. for
non-interactive password change)
Expand All @@ -138,6 +197,12 @@ def _run_etcdctl( # noqa: C901
args.append(f"--user={auth_username}")
if auth_password:
args.append(f"--password={auth_password}")
if member:
args.append(member)
if peer_url:
args.append(f"--peer-urls={peer_url}")
if learner:
args.append("--learner=True")
if output_format:
args.append(f"-w={output_format}")
if use_input:
Expand Down
4 changes: 4 additions & 0 deletions src/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ class EtcdUserManagementError(Exception):

class EtcdAuthNotEnabledError(Exception):
"""Custom Exception if authentication could not be enabled in the etcd cluster."""


class EtcdClusterManagementError(Exception):
"""Custom Exception if cluster management operation fails."""
2 changes: 2 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def cluster(self) -> EtcdCluster:
def servers(self) -> Set[EtcdServer]:
"""Get all servers/units in the current peer relation, including this unit itself.

Note: This is not to be confused with the list of cluster members.

Returns:
Set of EtcdServers with their unit data.
"""
Expand Down
44 changes: 40 additions & 4 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def update(self, items: dict[str, str]) -> None:
self.relation_data.update(update_content)

for field in delete_fields:
self.relation_data.pop(field, None)
# use del instead of pop here because of error with dataplatform-libs
del self.relation_data[field]
reneradoi marked this conversation as resolved.
Show resolved Hide resolved


class EtcdServer(RelationState):
Expand Down Expand Up @@ -95,6 +96,16 @@ def client_url(self) -> str:
"""The client connection endpoint for the etcd server."""
return f"http://{self.ip}:{CLIENT_PORT}"

@property
def member_endpoint(self) -> str:
"""Concatenate member_name and peer_url."""
return f"{self.member_name}={self.peer_url}"

@property
def started(self) -> bool:
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
"""Flag to check if the unit has started the etcd service."""
return self.relation_data.get("state", None) == "started"


class EtcdCluster(RelationState):
"""State/Relation data collection for the etcd application."""
Expand All @@ -110,9 +121,9 @@ def __init__(
self.app = component

@property
def initial_cluster_state(self) -> str:
"""The initial cluster state ('new' or 'existing') of the etcd cluster."""
return self.relation_data.get("initial_cluster_state", "")
def cluster_state(self) -> str:
"""The cluster state ('new' or 'existing') of the etcd cluster."""
return self.relation_data.get("cluster_state", "")
reneradoi marked this conversation as resolved.
Show resolved Hide resolved

@property
def internal_user_credentials(self) -> dict[str, str]:
Expand All @@ -126,3 +137,28 @@ def internal_user_credentials(self) -> dict[str, str]:
def auth_enabled(self) -> bool:
"""Flag to check if authentication is already enabled in the Cluster."""
return self.relation_data.get("authentication", "") == "enabled"

@property
def cluster_members(self) -> str:
"""Get the list of current members added to the etcd cluster.

This string is the output of the `etcdctl member add` command issued by the juju leader
when a new unit joins and is added as cluster member. This string needs to be provided
as an argument `--initial-cluster` when starting the workload on the newly added unit.

This data is added to the peer cluster relation app databag when the first unit initializes
the cluster on startup after deployment.
"""
return self.relation_data.get("cluster_members", "")

@property
def learning_member(self) -> str:
"""Get the current learning member.

New cluster members are added to the etcd cluster as so-called learning members. That means
they are not participating in raft leader election because they do not yet have up-to-data
data. When added as cluster members with the `add member` command, the juju leader will
put the unit's `member_id` here. After promoting to full voting member, the juju leader
will unset the `member_id` here.
"""
return self.relation_data.get("learning_member", "")
68 changes: 37 additions & 31 deletions src/events/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

from common.exceptions import (
EtcdAuthNotEnabledError,
EtcdClusterManagementError,
EtcdUserManagementError,
RaftLeaderNotFoundError,
)
from common.secrets import get_secret_from_id
from literals import INTERNAL_USER, INTERNAL_USER_PASSWORD_CONFIG, PEER_RELATION, Status
Expand Down Expand Up @@ -68,29 +68,31 @@ def _on_install(self, event: ops.InstallEvent) -> None:

def _on_start(self, event: ops.StartEvent) -> None:
"""Handle start event."""
# Make sure all planned units have joined the peer relation before starting the cluster
if (
not self.charm.state.peer_relation
or len(self.charm.state.peer_relation.units) + 1 < self.charm.app.planned_units()
self.charm.config_manager.set_config_properties()

if self.charm.unit.is_leader():
self.charm.cluster_manager.start_member()
reneradoi marked this conversation as resolved.
Show resolved Hide resolved

if not self.charm.state.cluster.auth_enabled:
try:
self.charm.cluster_manager.enable_authentication()
self.charm.state.cluster.update({"authentication": "enabled"})
except (EtcdAuthNotEnabledError, EtcdUserManagementError) as e:
logger.error(e)
self.charm.set_status(Status.AUTHENTICATION_NOT_ENABLED)
return
elif (
self.charm.state.unit_server.member_endpoint
in self.charm.state.cluster.cluster_members
):
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Deferring start because not all units joined peer-relation.")
self.charm.set_status(Status.NO_PEER_RELATION)
# this unit has been added to the etcd cluster
self.charm.cluster_manager.start_member()
else:
# this is a non-leader unit that has not been added to the cluster
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
# wait for leader to process `relation_joined` event and add the member to the cluster
event.defer()
return

self.charm.config_manager.set_config_properties()

self.charm.workload.start()

if self.charm.unit.is_leader() and not self.charm.state.cluster.auth_enabled:
try:
self.charm.cluster_manager.enable_authentication()
self.charm.state.cluster.update({"authentication": "enabled"})
except (EtcdAuthNotEnabledError, EtcdUserManagementError) as e:
logger.error(e)
self.charm.set_status(Status.AUTHENTICATION_NOT_ENABLED)
return

if self.charm.workload.alive():
self.charm.set_status(Status.ACTIVE)
else:
Expand All @@ -107,27 +109,31 @@ def _on_config_changed(self, event: ops.ConfigChangedEvent) -> None:
def _on_cluster_relation_created(self, event: RelationCreatedEvent) -> None:
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
"""Handle event received by a new unit when joining the cluster relation."""
self.charm.state.unit_server.update(self.charm.cluster_manager.get_host_mapping())
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
if self.charm.unit.is_leader():
self.charm.state.cluster.update({"initial-cluster-state": "new"})

def _on_cluster_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handle all events related to the cluster-peer relation."""
pass
if self.charm.unit.is_leader() and self.charm.state.cluster.learning_member:
try:
# this will promote any learner, not only the unit that updated its relation data
self.charm.cluster_manager.promote_learning_member()
except EtcdClusterManagementError as e:
logger.warning(e)
event.defer()
return

def _on_cluster_relation_departed(self, event: RelationDepartedEvent) -> None:
"""Handle event received by a unit leaves the cluster relation."""
pass

def _on_cluster_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Handle event received by all units when a new unit joins the cluster relation."""
# Todo: remove this test at some point, this is just for showcasing that it works :)
# We will need to perform any HA-related action against the raft leader
# e.g. add members, trigger leader election, log compaction, etc.
try:
raft_leader = self.charm.cluster_manager.get_leader()
logger.info(f"Raft leader: {raft_leader}")
except RaftLeaderNotFoundError as e:
logger.warning(e)
if self.charm.unit.is_leader():
try:
self.charm.cluster_manager.add_member(event.unit.name)
except (EtcdClusterManagementError, KeyError) as e:
logger.warning(e)
event.defer()
return

def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
"""Handle all events in the 'cluster' peer relation."""
Expand Down
69 changes: 68 additions & 1 deletion src/managers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from common.client import EtcdClient
from common.exceptions import (
EtcdAuthNotEnabledError,
EtcdClusterManagementError,
EtcdUserManagementError,
RaftLeaderNotFoundError,
)
from core.cluster import ClusterState
from core.workload import WorkloadBase
from literals import INTERNAL_USER

logger = logging.getLogger(__name__)
Expand All @@ -22,8 +24,9 @@
class ClusterManager:
"""Manage cluster members, quorum and authorization."""

def __init__(self, state: ClusterState):
def __init__(self, state: ClusterState, workload: WorkloadBase):
self.state = state
self.workload = workload
self.admin_user = INTERNAL_USER
self.admin_password = self.state.cluster.internal_user_credentials.get(INTERNAL_USER, "")
self.cluster_endpoints = [server.client_url for server in self.state.servers]
Expand Down Expand Up @@ -85,3 +88,67 @@ def update_credentials(self, username: str, password: str) -> None:
client.update_password(username=username, new_password=password)
except EtcdUserManagementError:
raise

def add_member(self, unit_name: str) -> None:
"""Add a new member to the etcd cluster."""
# retrieve the member information for the newly joined unit from the set of EtcdServers
member_name = ""
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
ip = ""
peer_url = ""

for server in self.state.servers:
if server.unit_name == unit_name:
member_name = server.member_name
ip = server.ip
peer_url = server.peer_url
break

# we need to make sure all required information are available before adding the member
if member_name and ip and peer_url:
try:
client = EtcdClient(
username=self.admin_user,
password=self.admin_password,
client_url=self.state.unit_server.client_url,
)
cluster_members, member_id = client.add_member_as_learner(member_name, peer_url)
self.state.cluster.update(
{"cluster_members": cluster_members, "learning_member": member_id}
)
logger.info(f"Added unit {unit_name} as new cluster member {member_id}.")
except EtcdClusterManagementError:
raise
else:
Mehdi-Bendriss marked this conversation as resolved.
Show resolved Hide resolved
raise KeyError(f"Peer relation data for unit {unit_name} not found.")

def start_member(self) -> None:
"""Start a cluster member and update its status."""
self.workload.start()
# this triggers a relation_changed event which the leader will use to promote
# a learner-member to fully-voting member
self.state.unit_server.update({"state": "started"})
if not self.state.cluster.cluster_state:
# mark the cluster as initialized
self.state.cluster.update(
{
"cluster_state": "existing",
reneradoi marked this conversation as resolved.
Show resolved Hide resolved
"cluster_members": self.state.unit_server.member_endpoint,
}
)

def promote_learning_member(self) -> None:
"""Promote a learning member to full-voting member."""
member_id = self.state.cluster.learning_member

try:
client = EtcdClient(
username=self.admin_user,
password=self.admin_password,
client_url=self.state.unit_server.client_url,
)
client.promote_member(member_id=member_id)
except EtcdClusterManagementError:
raise

self.state.cluster.update({"learning_member": ""})
logger.info(f"Successfully promoted learning member {member_id}.")
Loading
Loading