Skip to content

Commit

Permalink
[DPE-4582] Cruise Control (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored Aug 2, 2024
1 parent e423623 commit 7e1cda5
Show file tree
Hide file tree
Showing 55 changed files with 6,684 additions and 1,096 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ jobs:
integration-test:
strategy:
fail-fast: false
max-parallel: 2
matrix:
tox-environments:
- integration-charm
Expand All @@ -64,6 +63,7 @@ jobs:
- integration-password-rotation
- integration-tls
- integration-upgrade
# - integration-balancer
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Charmed Kafka K8s Operator delivers automated operations management from day

The Kafka K8s Operator can be found on [Charmhub](https://charmhub.io/kafka-k8s) and it comes with features such as:
- Fault-tolerance, replication, scalability and high-availability out-of-the-box.
- SASL/SCRAM auth for Broker-Broker and Client-Broker authenticaion enabled by default.
- SASL/SCRAM auth for Broker-Broker and Client-Broker authentication enabled by default.
- Access control management supported with user-provided ACL lists.

The Kafka K8s Operator uses the latest upstream Kafka binaries released by the The Apache Software Foundation that comes with Kafka, made available using the [`ubuntu/kafka` OCI image](https://registry.hub.docker.com/r/ubuntu/kafka) distributed by Canonical.
Expand Down
17 changes: 16 additions & 1 deletion actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ resume-upgrade:
set-tls-private-key:
description: Sets the private key identifying the target unit, which will be used for certificate signing requests (CSR).
When updated, certificates will be reissued to the unit.
Run for each unit separately. Requires a valid relation to an application providing the `certificates` relation interface.
Run for each unit separately. Requires a valid relation to an application providing the `certificates` relation interface.
params:
internal-key:
type: string
Expand All @@ -35,3 +35,18 @@ get-admin-credentials:
description: Get administrator authentication credentials for client commands
The returned client_properties can be used for Kafka bin commands using `--bootstrap-server` and `--command-config` for admin level administration
This action must be called on the leader unit.

rebalance:
description: Trigger a rebalance of cluster partitions based on configured goals
params:
mode:
type: string
description: The operation to issue to the balancer. This action must be called on the leader unit.
'full' - runs a full rebalance of all partitions across the whole cluster
'add' - evenly distributes replicas to new and available brokers
'remove' - moves under-replicated partition replicas assigned to decommissioned brokers, to available ones
enum: [full, add, remove]
dryrun:
description: Only generate the partition rebalance proposals and estimated result, without executing
type: boolean
default: true
34 changes: 26 additions & 8 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# See LICENSE file for licensing details.

options:
roles:
description: |
Comma separated list of the roles assigned to the nodes of this cluster.
This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control).
type: string
default: broker
compression_type:
description: Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.
type: string
Expand All @@ -24,7 +30,7 @@ options:
default: "-1"
log_retention_ms:
description: The number of milliseconds to keep a log file before deleting it (in milliseconds).
type: string
type: string
default: "-1"
log_segment_bytes:
description: The maximum size of a single log file.
Expand All @@ -48,11 +54,11 @@ options:
default: false
log_cleaner_delete_retention_ms:
description: How long are delete records retained.
type: string
type: string
default: "86400000"
log_cleaner_min_compaction_lag_ms:
description: The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
type: string
type: string
default: "0"
log_cleanup_policy:
description: "The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: 'delete' and 'compact'"
Expand All @@ -78,15 +84,27 @@ options:
description: Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv). Overrides any explicit value set via the zookeeper.ssl.ciphersuites system property (note the single word "ciphersuites"). The default value of null means the list of enabled cipher suites is determined by the Java runtime being used.
type: string
default: ""
profile:
description: "Profile representing the scope of deployment, and used to enable high-level customisation of sysconfigs, resource checks/allocation, warning levels, etc. Allowed values are: “production”, “staging” and “testing”"
type: string
default: production
certificate_extra_sans:
description: Config options to add extra-sans to the ones used when requesting server certificates. The extra-sans are specified by comma-separated names to be added when requesting signed certificates. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1 when requesting the certificate.
type: string
default: ""
profile:
description: 'Profile representing the scope of deployment, and used to enable high-level customisation of sysconfigs, resource checks/allocation, warning levels, etc. Allowed values are: “production”, “staging” and “testing”'
type: string
default: production
log_level:
description: 'Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG'
description: "Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG"
type: string
default: "INFO"
network_bandwidth:
description: The network bandwidth available for the cloud that the charm is deployed to, in KB.
type: int
default: 50000
cruisecontrol_balance_threshold:
description: The maximum allowed extent of unbalance between brokers for cpu, disk and network utilization, and replica counts. For example, a value of `1.1` ensures that no broker should have >1.1x average utilization of all the brokers
type: float
default: 1.1
cruisecontrol_capacity_threshold:
description: The maximum percentage of the total cpu, disk and network capacity that is allowed to be used on a broker. For example, a value of `0.8` ensures that no broker should have >80% utilization
type: float
default: 0.8
37 changes: 32 additions & 5 deletions lib/charms/data_platform_libs/v0/data_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 35
LIBPATCH = 38

PYDEPS = ["ops>=2.0.0"]

Expand Down Expand Up @@ -642,22 +642,26 @@ def _move_to_new_label_if_needed(self):
return

# Create a new secret with the new label
old_meta = self._secret_meta
content = self._secret_meta.get_content()
self._secret_uri = None

# I wish we could just check if we are the owners of the secret...
try:
self._secret_meta = self.add_secret(content, label=self.label)
except ModelError as err:
if "this unit is not the leader" not in str(err):
raise
old_meta.remove_all_revisions()
self.current_label = None

def set_content(self, content: Dict[str, str]) -> None:
"""Setting cached secret content."""
if not self.meta:
return

# DPE-4182: do not create new revision if the content stay the same
if content == self.get_content():
return

if content:
self._move_to_new_label_if_needed()
self.meta.set_content(content)
Expand Down Expand Up @@ -1586,7 +1590,7 @@ def _register_secret_to_relation(
"""
label = self._generate_secret_label(relation_name, relation_id, group)

# Fetchin the Secret's meta information ensuring that it's locally getting registered with
# Fetching the Secret's meta information ensuring that it's locally getting registered with
CachedSecret(self._model, self.component, label, secret_id).meta

def _register_secrets_to_relation(self, relation: Relation, params_name_list: List[str]):
Expand Down Expand Up @@ -2309,7 +2313,7 @@ def _secrets(self) -> dict:
return self._cached_secrets

def _get_secret(self, group) -> Optional[Dict[str, str]]:
"""Retrieveing secrets."""
"""Retrieving secrets."""
if not self.app:
return
if not self._secrets.get(group):
Expand Down Expand Up @@ -2602,6 +2606,14 @@ def set_version(self, relation_id: int, version: str) -> None:
"""
self.update_relation_data(relation_id, {"version": version})

def set_subordinated(self, relation_id: int) -> None:
"""Raises the subordinated flag in the application relation databag.
Args:
relation_id: the identifier for a particular relation.
"""
self.update_relation_data(relation_id, {"subordinated": "true"})


class DatabaseProviderEventHandlers(EventHandlers):
"""Provider-side of the database relation handlers."""
Expand Down Expand Up @@ -2838,6 +2850,21 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None:

def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the database relation has changed."""
is_subordinate = False
remote_unit_data = None
for key in event.relation.data.keys():
if isinstance(key, Unit) and not key.name.startswith(self.charm.app.name):
remote_unit_data = event.relation.data[key]
elif isinstance(key, Application) and key.name != self.charm.app.name:
is_subordinate = event.relation.data[key].get("subordinated") == "true"

if is_subordinate:
if not remote_unit_data:
return

if remote_unit_data.get("state") != "ready":
return

# Check which data has changed to emit customs events.
diff = self._diff(event)

Expand Down
22 changes: 15 additions & 7 deletions lib/charms/data_platform_libs/v0/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class MergedDataBag(ProviderDataBag, RequirerDataBag):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 3
LIBPATCH = 4

PYDEPS = ["ops>=2.0.0", "pydantic>=1.10,<2"]

Expand All @@ -177,6 +177,8 @@ class MergedDataBag(ProviderDataBag, RequirerDataBag):
AppModel = TypeVar("AppModel", bound=BaseModel)
UnitModel = TypeVar("UnitModel", bound=BaseModel)

DataBagNativeTypes = (int, str, float)


class BaseConfigModel(BaseModel):
"""Class to be used for defining the structured configuration options."""
Expand Down Expand Up @@ -231,10 +233,15 @@ def write(relation_data: RelationDataContent, model: BaseModel):
relation_data: pointer to the relation databag
model: instance of pydantic model to be written
"""
for key, value in model.dict(exclude_none=True).items():
relation_data[key.replace("_", "-")] = (
str(value) if isinstance(value, str) or isinstance(value, int) else json.dumps(value)
)
for key, value in model.dict(exclude_none=False).items():
if value:
relation_data[key.replace("_", "-")] = (
str(value)
if any(isinstance(value, _type) for _type in DataBagNativeTypes)
else json.dumps(value)
)
else:
relation_data[key.replace("_", "-")] = ""


def read(relation_data: MutableMapping[str, str], obj: Type[T]) -> T:
Expand All @@ -248,10 +255,11 @@ def read(relation_data: MutableMapping[str, str], obj: Type[T]) -> T:
**{
field_name: (
relation_data[parsed_key]
if field.annotation in [int, str, float]
if field.outer_type_ in DataBagNativeTypes
else json.loads(relation_data[parsed_key])
)
for field_name, field in obj.__fields__.items() # pyright: ignore[reportGeneralTypeIssues]
for field_name, field in obj.__fields__.items()
# pyright: ignore[reportGeneralTypeIssues]
if (parsed_key := field_name.replace("_", "-")) in relation_data
if relation_data[parsed_key]
}
Expand Down
40 changes: 32 additions & 8 deletions lib/charms/data_platform_libs/v0/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def restart(self, event) -> None:

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 15
LIBPATCH = 18

PYDEPS = ["pydantic>=1.10,<2", "poetry-core"]

Expand Down Expand Up @@ -501,7 +501,7 @@ class DataUpgrade(Object, ABC):

STATES = ["recovery", "failed", "idle", "ready", "upgrading", "completed"]

on = UpgradeEvents() # pyright: ignore [reportGeneralTypeIssues]
on = UpgradeEvents() # pyright: ignore [reportAssignmentType]

def __init__(
self,
Expand Down Expand Up @@ -606,6 +606,21 @@ def upgrade_stack(self, stack: List[int]) -> None:
self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)})
self._upgrade_stack = stack

@property
def other_unit_states(self) -> list:
"""Current upgrade state for other units.
Returns:
Unsorted list of upgrade states for other units.
"""
if not self.peer_relation:
return []

return [
self.peer_relation.data[unit].get("state", "")
for unit in list(self.peer_relation.units)
]

@property
def unit_states(self) -> list:
"""Current upgrade state for all units.
Expand Down Expand Up @@ -892,10 +907,21 @@ def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None:
logger.error(e)
self.set_unit_failed()
return
top_unit_id = self.upgrade_stack[-1]
top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}")
if (
top_unit == self.charm.unit
and self.peer_relation.data[self.charm.unit].get("state") == "recovery"
):
# While in a rollback and the Juju leader unit is the top unit in the upgrade stack, emit the event
# for this unit to start the rollback.
self.peer_relation.data[self.charm.unit].update({"state": "ready"})
self.on_upgrade_changed(event)
return
self.charm.unit.status = WaitingStatus("other units upgrading first...")
self.peer_relation.data[self.charm.unit].update({"state": "ready"})

if self.charm.app.planned_units() == 1:
if len(self.app_units) == 1:
# single unit upgrade, emit upgrade_granted event right away
getattr(self.on, "upgrade_granted").emit()

Expand Down Expand Up @@ -926,11 +952,8 @@ def on_upgrade_changed(self, event: EventBase) -> None:
return

if self.substrate == "vm" and self.cluster_state == "recovery":
# Only defer for vm, that will set unit states to "ready" on upgrade-charm
# on k8s only the upgrading unit will receive the upgrade-charm event
# and deferring will prevent the upgrade stack from being popped
logger.debug("Cluster in recovery, deferring...")
event.defer()
# skip run while in recovery. The event will be retrigged when the cluster is ready
logger.debug("Cluster in recovery, skip...")
return

# if all units completed, mark as complete
Expand Down Expand Up @@ -981,6 +1004,7 @@ def on_upgrade_changed(self, event: EventBase) -> None:
self.charm.unit == top_unit
and top_state in ["ready", "upgrading"]
and self.cluster_state == "ready"
and "upgrading" not in self.other_unit_states
):
logger.debug(
f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..."
Expand Down
Loading

0 comments on commit 7e1cda5

Please sign in to comment.