Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ingress usage #349

Merged
merged 8 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,33 @@ def __init__(self, *args):
resource_reqs_func=self._resource_reqs_from_config,
)

self.ingress = IngressPerUnitRequirer(self, relation_name="ingress", port=self._port)

self._topology = JujuTopology.from_charm(self)

external_url = urlparse(self.external_url)

self._scraping = MetricsEndpointProvider(
self,
relation_name="self-metrics-endpoint",
jobs=self.self_scraping_job,
external_url=self.external_url,
refresh_event=[ # needed for ingress
self.ingress.on.ready_for_unit,
self.ingress.on.revoked_for_unit,
self.on.update_status,
],
)
self.grafana_dashboard_provider = GrafanaDashboardProvider(charm=self)
self.metrics_consumer = MetricsEndpointConsumer(self)
self.ingress = IngressPerUnitRequirer(self, relation_name="ingress", port=self._port)

external_url = urlparse(self.external_url)
self._prometheus_server = Prometheus(web_route_prefix=external_url.path)

self.remote_write_provider = PrometheusRemoteWriteProvider(
charm=self,
relation_name=DEFAULT_REMOTE_WRITE_RELATION_NAME,
endpoint_address=external_url.hostname or "",
endpoint_port=external_url.port or self._port,
endpoint_port=external_url.port or 80,
endpoint_schema=external_url.scheme,
endpoint_path=f"{external_url.path}/api/v1/write",
)
Expand Down Expand Up @@ -153,10 +161,22 @@ def __init__(self, *args):
self.framework.observe(self.resources_patch.on.patch_failed, self._on_k8s_patch_failed)
self.framework.observe(self.on.validate_configuration_action, self._on_validate_config)

@property
def metrics_path(self):
"""The metrics path, adjusted by ingress path (if any)."""
return urlparse(self.external_url).path.rstrip("/") + "/metrics"

@property
def self_scraping_job(self):
"""The scrape job used by Prometheus to scrape itself during self-monitoring."""
return [{"static_configs": [{"targets": [f"*:{self._port}"]}]}]
port = urlparse(self.external_url).port or 80
return [
{
# `metrics_path` is automatically rendered by MetricsEndpointProvider, so no need
# to specify it here.
"static_configs": [{"targets": [f"*:{port}"]}],
}
]

@property
def log_level(self):
Expand All @@ -180,7 +200,7 @@ def _default_config(self):
"job_name": "prometheus",
"scrape_interval": "5s",
"scrape_timeout": "5s",
"metrics_path": "/metrics",
"metrics_path": self.metrics_path,
"honor_timestamps": True,
"scheme": "http",
"static_configs": [
Expand Down Expand Up @@ -410,10 +430,13 @@ def _update_config(self, container) -> bool:
return True

def _update_layer(self, container) -> bool:
current_services = container.get_plan().services
current_planned_services = container.get_plan().services
new_layer = self._prometheus_layer

if current_services == new_layer.services:
current_services = container.get_services() # mapping from str to ServiceInfo
all_svcs_running = all(svc.is_running() for svc in current_services.values())

if current_planned_services == new_layer.services and all_svcs_running:
return False

container.add_layer(self._name, new_layer, combine=True)
Expand Down
22 changes: 5 additions & 17 deletions src/prometheus_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,11 @@ def __init__(
when we relate to an ingress.
api_timeout: Optional; timeout (in seconds) to observe when interacting with the API.
"""
if web_route_prefix and not web_route_prefix.endswith("/"):
# If we do not add the '/' and the end, we will lose the last
# bit of the path:
#
# BAD:
#
# >>> urljoin('http://some/more', 'thing')
# 'http://some/thing'
#
# GOOD:
#
# >>> urljoin('http://some/more/', 'thing')
# 'http://some/more/thing'
#
web_route_prefix = f"{web_route_prefix}/"

self.base_url = urljoin(f"http://{host}:{port}", web_route_prefix)
web_route_prefix = web_route_prefix.lstrip("/").rstrip("/")
self.base_url = f"http://{host.rstrip('/')}:{port}/" + web_route_prefix
if not self.base_url.endswith("/"):
self.base_url += "/"

self.api_timeout = api_timeout

def reload_configuration(self) -> Union[bool, str]:
Expand Down
236 changes: 236 additions & 0 deletions tests/integration/test_external_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.

"""Test various aspects of `external_url`.

1. When external_url is set (with path prefix) via traefik, default and self-scraping jobs are
'up'.
2. When external_url is set (with path prefix) via config option to a different value,
default and self-scraping jobs are 'up'.
"""

import asyncio
import json
import logging
import subprocess
import urllib.request

import pytest
from helpers import oci_image, unit_address
from pytest_operator.plugin import OpsTest
from workload import Prometheus

logger = logging.getLogger(__name__)

prometheus_app_name = "prometheus"
prometheus_resources = {"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")}
external_prom_name = "external-prometheus"

# Two prometheus units are sufficient to test potential interactions between multi-unit
# deployments and external_url
num_units = 2

# The period of time required to be idle before `wait_for_idle` returns is set to 90 sec because
# the default scrape_interval in prometheus is 1m.
idle_period = 90


async def test_setup_env(ops_test: OpsTest):
await ops_test.model.set_config(
{"logging-config": "<root>=WARNING; unit=DEBUG", "update-status-hook-interval": "60m"}
)


@pytest.mark.abort_on_fail
async def test_deploy(ops_test: OpsTest, prometheus_charm):
await asyncio.gather(
ops_test.model.deploy(
prometheus_charm,
resources=prometheus_resources,
application_name=prometheus_app_name,
num_units=num_units,
trust=True,
),
ops_test.model.deploy(
prometheus_charm,
resources=prometheus_resources,
application_name=external_prom_name, # to scrape the main prom
trust=True,
),
ops_test.model.deploy(
"ch:traefik-k8s",
application_name="traefik",
channel="edge",
),
)

await asyncio.gather(
ops_test.model.add_relation(
f"{prometheus_app_name}:self-metrics-endpoint", external_prom_name
),
ops_test.model.wait_for_idle(
apps=[prometheus_app_name],
status="active",
wait_for_units=num_units,
timeout=300,
),
ops_test.model.wait_for_idle(
apps=["traefik", external_prom_name],
wait_for_units=1,
timeout=300,
),
)


async def wait_for_ingress(ops_test: OpsTest):
"""Returns when all ingressed prometheuses are ready.

Wait until ingress is really ready.
Workaround for https://github.com/canonical/traefik-k8s-operator/issues/78.
"""

async def get_ingressed_endpoints():
action = (
await ops_test.model.applications["traefik"]
.units[0]
.run_action("show-proxied-endpoints")
)
res = (await action.wait()).results
# res looks like this:
# {'proxied-endpoints':
# '{"prometheus/0": {"url": "http://10.128.0.2:80/test-external-url-0lxt-prometheus-0"},
# "prometheus/1": {"url": "http://10.128.0.2:80/test-external-url-0lxt-prometheus-1"}
# }', 'return-code': 0}

proxied_endpoints = json.loads(res["proxied-endpoints"])
endpoints = [v["url"] for v in proxied_endpoints.values()]
return endpoints

ingressed_endpoints = await get_ingressed_endpoints()
logger.debug("Waiting for endpoints to become reachable: %s", ingressed_endpoints)
await ops_test.model.block_until(
lambda: all(Prometheus(ep).is_ready() for ep in ingressed_endpoints)
)


async def force_update_status(ops_test: OpsTest):
"""Force an update-status emission and wait for active/idle."""
await ops_test.model.set_config({"update-status-hook-interval": "10s"})
await asyncio.sleep(11)
await ops_test.model.set_config({"update-status-hook-interval": "60m"})
logger.debug("At this point, ingressed endpoints should become reachable and reldata updated")
await ops_test.model.wait_for_idle(
apps=[prometheus_app_name, "traefik", external_prom_name],
status="active",
timeout=600,
idle_period=idle_period,
)


async def test_jobs_are_up_via_traefik(ops_test: OpsTest):
# Set up microk8s metallb addon, needed by traefik
logger.info("(Re)-enabling metallb")
cmd = [
"sh",
"-c",
"ip -4 -j route | jq -r '.[] | select(.dst | contains(\"default\")) | .prefsrc'",
]
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
ip = result.stdout.decode("utf-8").strip()

logger.info("First, disable metallb, just in case")
try:
cmd = ["sg", "microk8s", "-c", "microk8s disable metallb"]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except Exception as e:
print(e)
raise

await asyncio.sleep(30) # why? just because, for now

logger.info("Now enable metallb")
try:
cmd = ["sg", "microk8s", "-c", f"microk8s enable metallb:{ip}-{ip}"]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except Exception as e:
print(e)
raise

# GIVEN metallb is ready
await asyncio.sleep(30) # why? just because, for now

# WHEN prometheus is related to traefik
await ops_test.model.add_relation(f"{prometheus_app_name}:ingress", "traefik")

# Workaround to make sure everything is up-to-date: update-status
await ops_test.model.set_config({"update-status-hook-interval": "10s"})
await asyncio.sleep(11)
await ops_test.model.set_config({"update-status-hook-interval": "60m"})

logger.info("At this point, after re-enabling metallb, traefik should become active")
await ops_test.model.wait_for_idle(
apps=[prometheus_app_name, "traefik", external_prom_name],
status="active",
timeout=600,
idle_period=idle_period,
)

# THEN the prometheus API is served on metallb's IP and the model-app-unit path
def prom_url(unit: int) -> str:
return f"http://{ip}/{ops_test.model_name}-{prometheus_app_name}-{unit}"

# AND the default job is healthy (its scrape url must have the path for this to work)
prom_urls = [prom_url(i) + "/api/v1/targets" for i in range(num_units)]
for url in prom_urls:
logger.info("Attmpting to fetch targets from url: %s", url)
targets = urllib.request.urlopen(url, None, timeout=2).read().decode("utf8")
logger.info("Response: %s", targets)
assert '"health":"up"' in targets
assert '"health":"down"' not in targets

# Workaround to make sure everything is up-to-date:
# Ingress events are already passed as refresh_event to the MeetricsEndpointProvider.
# TODO remove these two lines when https://github.com/canonical/traefik-k8s-operator/issues/78
# is fixed.
await wait_for_ingress(ops_test)
await force_update_status(ops_test)

# AND the self-scrape jobs are healthy (their scrape url must have the entire web_external_url
# for this to work).
external_prom_url = f"http://{await unit_address(ops_test, external_prom_name, 0)}:9090"
url = external_prom_url + "/api/v1/targets"
logger.info("Attmpting to fetch targets from url: %s", external_prom_url)
targets = urllib.request.urlopen(url, None, timeout=2).read().decode("utf8")
logger.info("Response: %s", targets)
assert '"health":"up"' in targets
assert '"health":"down"' not in targets


async def test_jobs_are_up_with_config_option_overriding_traefik(ops_test: OpsTest):
# GIVEN traefik ingress for prom
# (from previous test)

# WHEN the `web_external_url` config option is set
await ops_test.model.applications[prometheus_app_name].set_config(
{"web_external_url": "http://foo.bar/baz"},
)

await ops_test.model.wait_for_idle(
apps=[prometheus_app_name],
status="active",
timeout=300,
)

# THEN the prometheus api is served on the unit's IP and web_external_url's path
async def prom_url(unit: int) -> str:
return f"http://{await unit_address(ops_test, prometheus_app_name, unit)}:9090/baz"

# AND the default job is healthy (its scrape url must have the path for this to work)
prom_urls = [await prom_url(i) + "/api/v1/targets" for i in range(num_units)]
for url in prom_urls:
logger.info("Attmpting to fetch targets from url: %s", url)
targets = urllib.request.urlopen(url, None, timeout=2).read().decode("utf8")
logger.info("Response: %s", targets)
assert '"health":"up"' in targets
assert '"health":"down"' not in targets
12 changes: 12 additions & 0 deletions tests/unit/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ def wrapper_decorator(*args, **kwargs):
return wrapper_decorator


def cli_arg(plan, cli_opt):
plan_dict = plan.to_dict()
args = plan_dict["services"]["prometheus"]["command"].split()
for arg in args:
opt_list = arg.split("=")
if len(opt_list) == 2 and opt_list[0] == cli_opt:
return opt_list[1]
if len(opt_list) == 1 and opt_list[0] == cli_opt:
return opt_list[0]
return None


k8s_resource_multipatch = patch.multiple(
"charm.KubernetesComputeResourcesPatch",
_namespace="test-namespace",
Expand Down
14 changes: 1 addition & 13 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import ops
import yaml
from helpers import k8s_resource_multipatch, prom_multipatch
from helpers import cli_arg, k8s_resource_multipatch, prom_multipatch
from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus
from ops.testing import Harness

Expand Down Expand Up @@ -284,18 +284,6 @@ def scrape_config(config_yaml, job_name):
return None


def cli_arg(plan, cli_opt):
plan_dict = plan.to_dict()
args = plan_dict["services"]["prometheus"]["command"].split()
for arg in args:
opt_list = arg.split("=")
if len(opt_list) == 2 and opt_list[0] == cli_opt:
return opt_list[1]
if len(opt_list) == 1 and opt_list[0] == cli_opt:
return opt_list[0]
return None


@prom_multipatch
class TestConfigMaximumRetentionSize(unittest.TestCase):
"""Test the config.yaml option 'maximum_retention_size'."""
Expand Down
Loading