diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py new file mode 100644 index 000000000..5135f795e --- /dev/null +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -0,0 +1,490 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""This charm library contains utilities to automatically forward your charm logs to a loki-push-api endpoint. + +(yes! charm code, not workload code!) + +This means that, if your charm is related to, for example, COS' Loki charm (or a Grafana Agent), +you will be able to inspect in real time from the Grafana dashboard the logs emitted by your charm. + +To start using this library, you need to do two things: +1) decorate your charm class with + +`@log_charm(loki_push_api_endpoint="my_logging_endpoint")` + +2) add to your charm a "my_logging_endpoint" (you can name this attribute whatever you like) **property** +that returns an http/https endpoint url. If you are using the `LogProxyConsumer` as +`self.logging = LogProxyConsumer(self, ...)`, the implementation could be: + +``` + @property + def my_logging_endpoint(self) -> List[str]: + '''Loki push API endpoints for charm logging''' + return self.logging.loki_endpoints: +``` + +The ``log_charm`` decorator will take these endpoints and set up the root logger (as in python's +logging module root logger) to forward all logs to these loki endpoints. + +## TLS support +If your charm integrates with a tls provider which is also trusted by Loki, you can configure TLS by +passing to charm logging a `server_cert` parameter. + +``` +@log_charm(loki_push_api_endpoint="my_logging_endpoint", server_cert="my_server_cert") +class MyCharm(...): + ... + + @property + def my_server_cert(self) -> Optional[str]: + '''Absolute path to a server crt if TLS is enabled.''' + if self.tls_is_enabled(): + return "/path/to/my/server_cert.crt" +``` + +""" +import copy +import functools +import json +import logging +import os +import string +import time +import urllib.error +from contextlib import contextmanager +from logging.config import ConvertingDict +from pathlib import Path +from typing import ( + Any, + Callable, + Dict, + Optional, + Tuple, + Type, + TypeVar, + Union, + cast, +) +from urllib import request, parse + +from cosl import JujuTopology +from ops.charm import CharmBase +from ops.framework import Framework + +# prevent infinite recursion because on failure urllib3 will push more logs +# https://github.com/GreyZmeem/python-logging-loki/issues/18 +logging.getLogger("urllib3").setLevel(logging.INFO) + +# The unique Charmhub library identifier, never change it +LIBID = "52ee6051f4e54aedaa60aa04134d1a6d" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +PYDEPS = ["cosl"] + +logger = logging.getLogger("charm_logging") +_GetterType = Union[Callable[[CharmBase], Optional[str]], property] +CHARM_LOGGING_ENABLED = "CHARM_LOGGING_ENABLED" + + +# from https://github.com/GreyZmeem/python-logging-loki, which seems to be dead +class LokiEmitter: + """Base Loki emitter class.""" + + #: Success HTTP status code from Loki API. + success_response_code: int = 204 + + #: Label name indicating logging level. + level_tag: str = "severity" + #: Label name indicating logger name. + logger_tag: str = "logger" + + #: String contains chars that can be used in label names in LogQL. + label_allowed_chars: str = "".join((string.ascii_letters, string.digits, "_")) + #: A list of pairs of characters to replace in the label name. + label_replace_with: Tuple[Tuple[str, str], ...] = ( + ("'", ""), + ('"', ""), + (" ", "_"), + (".", "_"), + ("-", "_"), + ) + + def __init__(self, url: str, tags: Optional[dict] = None, fields: Optional[dict] = None, cert: Optional[str] = None): + """Create new Loki emitter. + + Arguments: + url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). + tags: Default tags added to every log record. + fields: Default fields added to every log record. + cert: Absolute path to a ca cert for TLS authentication. + + """ + #: Tags that will be added to all records handled by this handler. + self.tags = tags or {} + # fields will be prepended to all records handled by this handler + self.prefix = " ".join(f"{field}={value}" for field, value in fields.items()) + " " if fields else "" + + #: Loki JSON push endpoint (e.g `http://127.0.0.1/loki/api/v1/push`) + self.url = url + #: Optional cert for TLS auth + self.cert = cert + + def __call__(self, record: logging.LogRecord, line: str): + """Send log record to Loki.""" + payload = self.build_payload(record, self.prefix + line) + req = request.Request(self.url, method='POST') + req.add_header('Content-Type', 'application/json; charset=utf-8') + jsondata_encoded = json.dumps(payload).encode("utf-8") + + try: + resp = request.urlopen( + req, + jsondata_encoded, + capath=self.cert + ) + except urllib.error.HTTPError as e: + logger.error(f"error pushing logs to {self.url}: {e.status, e.reason}") + return + + if resp.getcode() != self.success_response_code: + raise ValueError( + "Unexpected Loki API response status code: {0}".format(resp.status_code) + ) + + def build_payload(self, record: logging.LogRecord, line) -> dict: + """Build JSON payload with a log entry.""" + labels = self.build_tags(record) + ns = 1e9 + ts = str(int(time.time() * ns)) + stream = { + "stream": labels, + "values": [[ts, line]], + } + print(stream) + return {"streams": [stream]} + + @functools.lru_cache(256) + def format_label(self, label: str) -> str: + """Build label to match prometheus format. + + `Label format `_ + """ + for char_from, char_to in self.label_replace_with: + label = label.replace(char_from, char_to) + return "".join(char for char in label if char in self.label_allowed_chars) + + def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: + """Return tags that must be send to Loki with a log record.""" + tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags + tags = cast(Dict[str, Any], copy.deepcopy(tags)) + tags[self.level_tag] = record.levelname.lower() + tags[self.logger_tag] = record.name + + extra_tags = getattr(record, "tags", {}) + if not isinstance(extra_tags, dict): + return tags + + for tag_name, tag_value in extra_tags.items(): + cleared_name = self.format_label(tag_name) + if cleared_name: + tags[cleared_name] = tag_value + + return tags + + +class LokiHandler(logging.Handler): + """Log handler that sends log records to Loki. + + `Loki API ter/docs/api.md>` + """ + + def __init__( + self, + url: str, + tags: Optional[dict] = None, + fields: Optional[dict] = None, + # username, password tuple + cert: Optional[str] = None, + ): + """Create new Loki logging handler. + + Arguments: + url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). + tags: Default tags added to every log record. + + # FIXME: Session expects a .pem file it says + cert: Optional absolute path to cert file for TLS auth. + + """ + super().__init__() + self.emitter = LokiEmitter(url, tags, fields, cert) + + def emit(self, record: logging.LogRecord): + """Send log record to Loki.""" + # noinspection PyBroadException + try: + self.emitter(record, self.format(record)) + except Exception: + self.handleError(record) + + +def is_enabled() -> bool: + """Whether charm logging is enabled.""" + return os.getenv(CHARM_LOGGING_ENABLED, "1") == "1" + + +class CharmLoggingError(Exception): + """Base class for all exceptions raised by this module.""" + + +class InvalidEndpointError(CharmLoggingError): + """Raised if an endpoint is invalid.""" + + +@contextmanager +def charm_logging_disabled(): + """Contextmanager to temporarily disable charm logging. + + For usage in tests. + """ + previous = os.getenv(CHARM_LOGGING_ENABLED, "1") + os.environ[CHARM_LOGGING_ENABLED] = "0" + yield + os.environ[CHARM_LOGGING_ENABLED] = previous + + +_C = TypeVar("_C", bound=Type[CharmBase]) +_T = TypeVar("_T", bound=type) +_F = TypeVar("_F", bound=Type[Callable]) + + +def _get_logging_endpoints(logging_endpoints_getter, self, charm): + if isinstance(logging_endpoints_getter, property): + logging_endpoints = logging_endpoints_getter.__get__(self) + else: # method or callable + logging_endpoints = logging_endpoints_getter(self) + + if logging_endpoints is None: + logger.debug( + f"{charm}.{logging_endpoints_getter} returned None; quietly disabling " + f"charm_logging for the run." + ) + return None + + errors = [] + logging_endpoints = tuple(logging_endpoints) + sanitized_logging_endponts = [] + for endpoint in logging_endpoints: + if isinstance(endpoint, str): + sanitized_logging_endponts.append(endpoint) + else: + errors.append(f"invalid endpoint: expected string, got {endpoint!r}") + + if errors: + logger.error( + f"{charm}.{logging_endpoints_getter} should return an iterable of Loki push-api " + "(-compatible) endpoints (strings); " + f"ERRORS: {errors}" + ) + + return sanitized_logging_endponts + + +def _get_server_cert(server_cert_getter, self, charm): + if isinstance(server_cert_getter, property): + server_cert = server_cert_getter.__get__(self) + else: # method or callable + server_cert = server_cert_getter(self) + + if server_cert is None: + logger.warning( + f"{charm}.{server_cert_getter} returned None; sending logs over INSECURE connection." + ) + return None + + if not Path(server_cert).is_absolute(): + raise ValueError( + f"{charm}.{server_cert_getter} should return a valid tls cert absolute path (string | Path)); " + f"got {server_cert} instead." + ) + + if not Path(server_cert).exists(): + logger.warning(f"cert not found at {server_cert}: sending logs over INSECURE connection") + return None + + return server_cert + + +def _setup_root_logger_initializer( + charm: Type[CharmBase], + logging_endpoints_getter: _GetterType, + server_cert_getter: Optional[_GetterType], + service_name: Optional[str] = None, +): + """Patch the charm's initializer and inject a call to set up root logging.""" + original_init = charm.__init__ + + @functools.wraps(original_init) + def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): + original_init(self, framework, *args, **kwargs) + + if not is_enabled(): + logger.info("Charm logging DISABLED by env: skipping root logger initialization") + return + + try: + logging_endpoints = _get_logging_endpoints(logging_endpoints_getter, self, charm) + except Exception: + # if anything goes wrong with retrieving the endpoint, we go on with logging disabled. + # better than breaking the charm. + logger.exception( + f"exception retrieving the logging " + f"endpoint from {charm}.{logging_endpoints_getter}; " + f"proceeding with charm_logging DISABLED. " + ) + return + + if not logging_endpoints: + return + + juju_topology = JujuTopology.from_charm(self) + tags = { + **juju_topology.as_dict(), + "service_name": service_name or self.app.name, + "charm_type_name": type(self).__name__, + "dispatch_path": os.getenv("JUJU_DISPATCH_PATH", ""), + } + fields = {} + try: + from charms.tempo_k8s.v1.charm_tracing import get_current_span + span = get_current_span() + if span: + logger.debug('root span found') + + fields['trace_id'] = str(hex(span.get_span_context().trace_id)[2:]) # strip 0x prefix + fields['span_id'] = str(hex(span.get_span_context().span_id)[2:]) # strip 0x prefix + + except ModuleNotFoundError: + logger.debug("no tracing library found: will not attach trace ids to logs") + + server_cert: Optional[Union[str, Path]] = ( + _get_server_cert(server_cert_getter, self, charm) if server_cert_getter else None + ) + + root_logger = logging.getLogger() + + for url in logging_endpoints: + handler = LokiHandler( + url=url, + tags=tags, + fields=fields, + cert=str(server_cert) if server_cert else None, + # auth=("username", "password"), + ) + + root_logger.addHandler(handler) + root_logger.debug( + "Initialized charm logger", + extra={"tags": {"endpoint": url}}, + ) + return + + charm.__init__ = wrap_init + + +def log_charm( + logging_endpoints: str, + server_cert: Optional[str] = None, + service_name: Optional[str] = None, +): + """Set up the root logger to forward any charm logs to one or more Loki push API endpoints. + + Usage: + >>> from charms.loki_k8s.v0.charm_logging import log_charm + >>> from charms.loki_k8s.v1.loki_push_api import LogProxyConsumer + >>> from ops import CharmBase + >>> + >>> @log_charm( + >>> logging_endpoints="loki_push_api_urls", + >>> ) + >>> class MyCharm(CharmBase): + >>> + >>> def __init__(self, framework: Framework): + >>> ... + >>> self.logging = LogProxyConsumer(self, ...) + >>> + >>> @property + >>> def loki_push_api_urls(self) -> Optional[List[str]]: + >>> return [endpoint['url'] for endpoint in self.logging.loki_endpoints] + >>> + :param server_cert: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + If it returns None, an _insecure_ connection will be used. + :param logging_endpoints: name of a property on the charm type that returns a sequence + of (fully resolvable) Loki push API urls. If None, charm logging will be effectively disabled. + Else, the root logger will be set up to forward all logs to those endpoints. + :param service_name: service name tag to attach to all logs generated by this charm. + Defaults to the juju application name this charm is deployed under. + """ + + def _decorator(charm_type: Type[CharmBase]): + """Autoinstrument the wrapped charmbase type.""" + _autoinstrument( + charm_type, + logging_endpoints_getter=getattr(charm_type, logging_endpoints), + server_cert_getter=getattr(charm_type, server_cert) if server_cert else None, + service_name=service_name, + ) + return charm_type + + return _decorator + + +def _autoinstrument( + charm_type: Type[CharmBase], + logging_endpoints_getter: _GetterType, + server_cert_getter: Optional[_GetterType] = None, + service_name: Optional[str] = None, +) -> Type[CharmBase]: + """Set up logging on this charm class. + + Use this function to setup automatic log forwarding for all logs emitted throughout executions of + this charm. + + Usage: + + >>> from charms.loki_k8s.v0.charm_logging import _autoinstrument + >>> from ops.main import main + >>> _autoinstrument( + >>> MyCharm, + >>> logging_endpoints_getter=MyCharm.get_loki_endpoints, + >>> service_name="MyCharm", + >>> ) + >>> main(MyCharm) + + :param charm_type: the CharmBase subclass to autoinstrument. + :param server_cert_getter: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + If it returns None, an _insecure_ connection will be used. + :param logging_endpoints_getter: name of a property on the charm type that returns a sequence + of (fully resolvable) Loki push API urls. If None, charm logging will be effectively disabled. + Else, the root logger will be set up to forward all logs to those endpoints. + :param service_name: service name tag to attach to all logs generated by this charm. + Defaults to the juju application name this charm is deployed under. + """ + logger.info(f"instrumenting {charm_type}") + _setup_root_logger_initializer( + charm_type, + logging_endpoints_getter, + server_cert_getter=server_cert_getter, + service_name=service_name, + ) + return charm_type diff --git a/requirements.txt b/requirements.txt index cdbc96519..fb2c88777 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,7 @@ lightkube-models # Cryptography # Deps: tls_certificates cryptography + +# deps: tracing, charm_tracing +pydantic +opentelemetry-exporter-otlp-proto-http==1.21.0 diff --git a/src/charm.py b/src/charm.py index a3c294a57..0c6f6db1f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -25,11 +25,13 @@ from urllib.error import HTTPError, URLError from urllib.parse import urlparse +import ops import yaml from charms.alertmanager_k8s.v1.alertmanager_dispatch import AlertmanagerConsumer from charms.catalogue_k8s.v1.catalogue import CatalogueConsumer, CatalogueItem from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider +from charms.loki_k8s.v0.charm_logging import log_charm from charms.loki_k8s.v0.loki_push_api import ( LokiPushApiAlertRulesChanged, LokiPushApiProvider, @@ -96,6 +98,8 @@ def to_status(tpl: Tuple[str, str]) -> StatusBase: return StatusBase.from_name(name, message) +# call log_charm second, so that trace_charm has a chance to set up the root span +@log_charm(logging_endpoints="logging_endpoints", server_cert="server_ca_cert_path") @trace_charm( tracing_endpoint="tracing_endpoint", server_cert="server_ca_cert_path", @@ -114,6 +118,8 @@ class LokiOperatorCharm(CharmBase): _stored = StoredState() _port = HTTP_LISTEN_PORT _name = "loki" + _loki_push_api_endpoint = "/loki/api/v1/push" + _loki_rules_endpoint = "/loki/api/v1/rules" _ca_cert_path = "/usr/local/share/ca-certificates/cos-ca.crt" def __init__(self, *args): @@ -186,6 +192,7 @@ def __init__(self, *args): ], source_type="loki", source_url=self._external_url, + extra_fields=self._extra_ds_fields ) self.metrics_provider = MetricsEndpointProvider( @@ -206,7 +213,7 @@ def __init__(self, *args): address=external_url.hostname or self.hostname, port=external_url.port or 443 if self._certs_on_disk else 80, scheme=external_url.scheme, - path=f"{external_url.path}/loki/api/v1/push", + path=f"{external_url.path}{self._loki_push_api_endpoint}", ) self.dashboard_provider = GrafanaDashboardProvider(self) @@ -380,6 +387,31 @@ def hostname(self) -> str: return socket.getfqdn() @property + def internal_url(self): + """Fqdn plus appropriate scheme and server port.""" + scheme = "https" if self.server_cert.server_cert else "http" + return f"{scheme}://{self.hostname}:{self._port}" + + @property + def _extra_ds_fields(self) -> Dict: + try: + # super duper ugly, but we have no way to exchange datasource IDs atm. + tempo_datasource_uid = f"juju_{self.model.name}_{self.model.uuid}_{self.tracing.relations[0].app.name}_0" + except Exception: + logger.error("failed to construct extra ds fields") + return {} + + return { + "derivedFields": [ + { + "datasourceUid": tempo_datasource_uid, + "matcherRegex": r"traceId=(\w+)", + "url": "$${__value.raw}", + "name": "trace_id" + } + ] + } + @property def _external_url(self) -> str: """Return the external hostname to be passed to ingress via the relation.""" if ingress_url := self.ingress_per_unit.url: @@ -392,8 +424,7 @@ def _external_url(self) -> str: # are routable virtually exclusively inside the cluster (as they rely) # on the cluster's DNS service, while the ip address is _sometimes_ # routable from the outside, e.g., when deploying on MicroK8s on Linux. - scheme = "https" if self.server_cert.server_cert else "http" - return f"{scheme}://{self.hostname}:{self._port}" + return self.internal_url @property def scrape_jobs(self) -> List[Dict[str, Any]]: @@ -528,7 +559,8 @@ def _configure(self): # noqa: C901 scheme="https" if self._certs_on_disk else "http", port=self._port ) self.metrics_provider.update_scrape_job_spec(self.scrape_jobs) - self.grafana_source_provider.update_source(source_url=self._external_url) + # self.grafana_source_provider.update_source(source_url=self._external_url) + # self.grafana_source_provider.update_source(source_url=self.hostname) self.loki_provider.update_endpoint(url=self._external_url) self.catalogue.update_item(item=self._catalogue_item) @@ -568,6 +600,7 @@ def _update_cert(self): ) # Repeat for the charm container. We need it there for loki client requests. + # (and charm tracing and logging TLS) ca_cert_path.parent.mkdir(exist_ok=True, parents=True) ca_cert_path.write_text(self.server_cert.ca_cert) # pyright: ignore else: @@ -700,7 +733,7 @@ def _check_alert_rules(self): ssl_context = ssl.create_default_context( cafile=self._ca_cert_path if Path(self._ca_cert_path).exists() else None, ) - url = f"{self._internal_url}/loki/api/v1/rules" + url = f"{self._internal_url}{self._loki_rules_endpoint}" try: logger.debug(f"Checking loki alert rules via {url}.") urllib.request.urlopen(url, timeout=2.0, context=ssl_context) @@ -771,11 +804,18 @@ def tracing_endpoint(self) -> Optional[str]: return self.tracing.get_endpoint("otlp_http") return None + @property + def logging_endpoints(self) -> Optional[List[str]]: + """Loki endpoint for charm logging.""" + if self._loki_container.get_service(self._name).current is ops.pebble.ServiceStatus.ACTIVE: + return ["http://localhost:3100" + self._loki_push_api_endpoint] + return [] + @property def server_ca_cert_path(self) -> Optional[str]: """Server CA certificate path for TLS tracing.""" if self._certs_in_reldata: - return self._ca_cert_path + return self._ca_cert_path if Path(self._ca_cert_path).exists() else None return None diff --git a/tests/integration/test_log_proxy_send_logs.py b/tests/integration/test_log_proxy_send_logs.py index 51d49ead3..c3be574b1 100644 --- a/tests/integration/test_log_proxy_send_logs.py +++ b/tests/integration/test_log_proxy_send_logs.py @@ -62,13 +62,20 @@ async def test_check_both_containers_send_logs(ops_test, loki_charm, log_proxy_t series = await loki_endpoint_request(ops_test, loki_app_name, "loki/api/v1/series", 0) data_series = json.loads(series)["data"] - assert len(data_series) == 3 + found = 0 for data in data_series: - assert data["container"] in ["workload-a", "workload-b"] - assert data["juju_application"] == tester_app_name - assert data["filename"] in [ - "/tmp/worload-a-1.log", - "/tmp/worload-a-2.log", - "/tmp/worload-b.log", - ] + if ( + data["container"] in ["workload-a", "workload-b"] + and data["juju_application"] == tester_app_name + and data["filename"] + in [ + "/tmp/worload-a-1.log", + "/tmp/worload-a-2.log", + "/tmp/worload-b.log", + ] + ): + found += 1 + + # there might be more data series (charm logging). + assert found == 3 diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py new file mode 100644 index 000000000..964ba705b --- /dev/null +++ b/tests/scenario/conftest.py @@ -0,0 +1,29 @@ +from unittest.mock import PropertyMock, patch + +import pytest +import scenario +from charm import LokiOperatorCharm + + +def tautology(*_, **__) -> bool: + return True + + +@pytest.fixture +def loki_charm(): + with ( + patch.multiple( + "charm.KubernetesComputeResourcesPatch", + _namespace=PropertyMock("test-namespace"), + _patch=PropertyMock(tautology), + is_ready=PropertyMock(tautology), + ), + patch("socket.getfqdn", new=lambda *args: "fqdn"), + patch("lightkube.core.client.GenericSyncClient"), + ): + yield LokiOperatorCharm + + +@pytest.fixture +def context(loki_charm): + return scenario.Context(loki_charm) diff --git a/tests/scenario/test_charm_logging.py b/tests/scenario/test_charm_logging.py new file mode 100644 index 000000000..12a20565f --- /dev/null +++ b/tests/scenario/test_charm_logging.py @@ -0,0 +1,60 @@ +import logging +from unittest.mock import patch + +import ops.pebble +import pytest +import scenario + + +@pytest.fixture +def loki_handler_mock(): + with patch("logging_loki.LokiHandler.handle") as h: + yield h + + +def test_no_endpoints_on_loki_not_ready(context, loki_handler_mock): + state = scenario.State( + containers=[ + scenario.Container( + "loki", + can_connect=True, + layers={"loki": ops.pebble.Layer({"services": {"loki": {}}})}, + service_status={"loki": ops.pebble.ServiceStatus.INACTIVE}, + exec_mock={("update-ca-certificates", "--fresh"): scenario.ExecOutput()}, + ) + ] + ) + + with context.manager("update-status", state) as mgr: + charm = mgr.charm + assert charm.logging_endpoints == [] + logging.getLogger("foo").debug("bar") + + loki_handler_mock.assert_not_called() + + +def test_endpoints_on_loki_ready(context, loki_handler_mock): + state = scenario.State( + containers=[ + scenario.Container( + "loki", + can_connect=True, + layers={"loki": ops.pebble.Layer({"services": {"loki": {}}})}, + service_status={"loki": ops.pebble.ServiceStatus.ACTIVE}, + exec_mock={("update-ca-certificates", "--fresh"): scenario.ExecOutput()}, + ) + ] + ) + + with context.manager("update-status", state) as mgr: + charm = mgr.charm + assert charm.logging_endpoints == ["http://fqdn:80/loki/api/v1/push"] + logging.getLogger("foo").debug("bar") + + loki_handler_mock.assert_called() + + for call in loki_handler_mock.call_args_list: + record = call.args[0] + if record.filename == __name__ + ".py": # log emitted by this module + assert record.msg == "bar" + assert record.name == "foo"