Skip to content

Commit

Permalink
Merge pull request #93 from LamaAni/fix_logger_follow_and_since_times
Browse files Browse the repository at this point in the history
Fix logger follow and since times
  • Loading branch information
LamaAni authored Nov 22, 2023
2 parents cee09aa + d7deabe commit d821c97
Show file tree
Hide file tree
Showing 29 changed files with 1,654 additions and 405 deletions.
45 changes: 34 additions & 11 deletions airflow_kubernetes_job_operator/kube_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from typing import List, Callable, Set, Union
import uuid
from weakref import WeakSet
from zthreading.tasks import Task
from zthreading.events import Event, EventHandler
Expand Down Expand Up @@ -110,6 +111,8 @@ def __init__(
thread_name=f"{self.__class__.__name__} {id(self)}",
)

self.__query_id = str(uuid.uuid4())

self.resource_path = resource_path
self.timeout = timeout
self.path_params = path_params or dict()
Expand All @@ -119,6 +122,7 @@ def __init__(
self.method = method
self.files = files
self.body = body

self.auto_reconnect = auto_reconnect
self.auto_reconnect_max_attempts = auto_reconnect_max_attempts
self.auto_reconnect_wait_between_attempts = auto_reconnect_wait_between_attempts
Expand All @@ -137,6 +141,16 @@ def __init__(
KubeApiRestQueryConnectionState.Disconnected
)

@property
def query_id(self) -> str:
"""A unique query id"""
return self.__query_id

@property
def debug_tag(self) -> str:
"""The debug log tag"""
return f"[{type(self).__name__}][{self.query_id[-4:]}][{self.resource_path}]"

@property
def query_running(self) -> bool:
"""True if the query is executing (connecting or streaming)"""
Expand All @@ -153,7 +167,7 @@ def _set_connection_state(
if self._connection_state == state:
return
self._connection_state = state
kube_logger.debug(f"[{self.resource_path}] {self._connection_state}")
kube_logger.debug(f"{self.debug_tag} State: {self._connection_state}")
if emit_event:
self.emit(self.connection_state_changed_event_name, state)

Expand Down Expand Up @@ -307,8 +321,8 @@ def can_reconnect():
and self.auto_reconnect_wait_between_attempts > 0
):
kube_logger.debug(
f"[{self.resource_path}][Reconnect] Sleeping for "
+ f"{self.auto_reconnect_wait_between_attempts}"
f"{self.debug_tag} Waiting before reconnect "
+ f"{self.auto_reconnect_wait_between_attempts} [s]"
)
time.sleep(self.auto_reconnect_wait_between_attempts)

Expand All @@ -327,7 +341,7 @@ def can_reconnect():
break

kube_logger.debug(
f"[{self.resource_path}] Connection lost, reconnecting.."
f"{self.debug_tag} Connection lost, reconnecting.."
)

# generating the query params
Expand Down Expand Up @@ -388,16 +402,16 @@ def can_reconnect():
except Exception:
pass

exeuctor_name = (
executor_name = (
f"{self.__class__.__module__}.{self.__class__.__name__}"
)

if isinstance(ex.body, dict):
exception_message = (
f"{exeuctor_name}, {ex.reason}: {ex.body.get('message')}"
f"{executor_name}, {ex.reason}: {ex.body.get('message')}"
)
else:
exception_message = f"{exeuctor_name}, {ex.reason}: {ex.body}"
exception_message = f"{executor_name}, {ex.reason}: {ex.body}"

err = KubeApiClientException(
exception_message, rest_api_exception=ex
Expand All @@ -411,16 +425,25 @@ def can_reconnect():

# check if can reconnect.
if can_reconnect():
kube_logger.debug(f"[{self.resource_path}] {exception_message}")
kube_logger.debug(
f"{self.debug_tag} KubeApi query reconnect: {exception_message}"
)
continue
else:
kube_logger.debug(
f"{self.debug_tag} KubeApi query disconnected. "
+ "Could not reconnect (Auto reconnect: {self.auto_reconnect})"
)

# check if is currently being stopped or already stopped.
if self.is_running and not self._is_being_stopped:
raise err
raise err from KubeApiException("Error while executing query")
else:
kube_logger.debug(f"Query stopped with: {exception_message}")
else:
raise ex
raise ex from KubeApiException("Error while executing query")
except Exception as ex:
raise ex
raise ex from KubeApiException("Error while executing query")

def start(self, client: "KubeApiRestClient"):
"""Start the query execution
Expand Down
121 changes: 103 additions & 18 deletions airflow_kubernetes_job_operator/kube_api/queries.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import re
from logging import Logger
import logging
from datetime import datetime
import json
from typing import Union, List, Callable
from time import sleep
import dateutil.parser

from logging import Logger
from datetime import datetime, timedelta, timezone
from typing import Union, List, Callable
from zthreading.events import Event

from airflow_kubernetes_job_operator.kube_api.exceptions import (
Expand Down Expand Up @@ -126,6 +128,7 @@ def __init__(
add_container_name_to_log: bool = None,
enable_kube_api_events: bool = None,
api_event_match_regexp: str = None,
auto_reset_last_line_timestamp: bool = None,
):
"""Returns the pod logs for a pod. Can follow the pod logs
in real time.
Expand All @@ -146,6 +149,9 @@ def __init__(
api_event_match_regexp (str, optional): Kube api event match regexp.
Must have exactly two match groups (event name, value)
Defaults to GetPodLogs.api_event_match_regexp
auto_reset_last_line_timestamp (bool, optional): If False, dose not reset the last line read timestamp.
This would that subsequnt calls to query, will produce newer lines only.
Defaults to follow!=true
Event binding:
To send a kube api event, place the following string in the pod output log,
Expand Down Expand Up @@ -175,7 +181,16 @@ def __init__(
self.kind = kind
self.name: str = name
self.namespace: str = namespace
if since and since.tzinfo is None:
since = since.astimezone()

self.since: datetime = since
self.auto_reset_last_line_timestamp: bool = (
auto_reset_last_line_timestamp
if auto_reset_last_line_timestamp is not None
else not follow
)
self.follow_restart_query_timeout = timedelta(seconds=0.5)
self.container = container
self.query_params = {
"follow": follow,
Expand All @@ -187,7 +202,9 @@ def __init__(
self.query_params["container"] = container

self.since = since
self._last_timestamp = None
self.__follow = follow
self.__last_log_line_timestamp = None
self.__query_start_tail_offset_seconds = None
self._active_namespace = None
self.add_container_name_to_log = (
add_container_name_to_log
Expand All @@ -203,26 +220,79 @@ def __init__(
api_event_match_regexp or GetPodLogs.api_event_match_regexp
)

def pre_request(self, client: "KubeApiRestClient"):
super().pre_request(client)
@property
def follow(self) -> bool:
return self.__follow

# Updating the since argument.
last_ts = (
self.since
if self.since is not None and self.since > self._last_timestamp
else self._last_timestamp
def _get_last_read_line_timestamp(self):
return (
self.__last_log_line_timestamp
or self.since
or datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)
)

since_seconds = None
if last_ts is not None:
since_seconds = (datetime.now() - last_ts).total_seconds()
if since_seconds < 0:
since_seconds = 0
def _execute_query(self, client: KubeApiRestClient):
# Loop override to enable follow.
# The log query may end at some point (Server may not allow query to run as long)
# We must create a loop execution for the underlining query.

if self.auto_reset_last_line_timestamp:
self.__last_log_line_timestamp = None

def can_loop_on_query():
if not self.follow:
return False
if self._is_being_stopped:
return False
return self.is_running

# Sent query will restart execution and call all execution events.
was_restarted = False
while True:
try:
super()._execute_query(client)
except KubeApiClientException as ex:
was_not_found = (
isinstance(ex.rest_api_exception.body, dict)
and ex.rest_api_exception.body.get("reason", None) == "NotFound"
)
if was_restarted and was_not_found:
kube_logger.debug(
f"{self.debug_tag} Logging stopped, resource not found after restart (Was it deleted?)"
)
break
raise ex

# checking if can loop
if not can_loop_on_query():
break

# Sleeping before query restart
if self.follow_restart_query_timeout:
sleep(self.follow_restart_query_timeout.total_seconds())
# Second check, since we slept and waited for execution.
if not can_loop_on_query():
break

was_restarted = True
kube_logger.debug(
f"{self.debug_tag} Get logs query restarted, following"
+ f" (last read line @ {self.__last_log_line_timestamp})"
)

def __update_query_since(self):
# Updating the since argument.
since = self._get_last_read_line_timestamp()
self.query_params["sinceTime"] = since.isoformat()

self.query_params["sinceSeconds"] = since_seconds
def pre_request(self, client: "KubeApiRestClient"):
super().pre_request(client)
self.__update_query_since()

def on_reconnect(self, client: KubeApiRestClient):
# updating the since property.
self.__update_query_since()

if not self.query_running or not self.auto_reconnect:
# if the query is not running then we have reached the pods log end.
# we should disconnect, otherwise we should have had an error.
Expand Down Expand Up @@ -267,10 +337,25 @@ def parse_and_emit_events(self, line: LogLine):

return has_events

def __update_last_timestamp(self, timestamp: datetime):
if (
self.__last_log_line_timestamp is None
or self.__last_log_line_timestamp < timestamp
):
self.__last_log_line_timestamp = timestamp

def parse_data(self, message_line: str):
self._last_timestamp = datetime.now()
timestamp = dateutil.parser.isoparse(message_line[: message_line.index(" ")])

if (
self.__last_log_line_timestamp
and timestamp <= self.__last_log_line_timestamp
):
# Older, no need to read.
return []

self.__update_last_timestamp(timestamp)

message = message_line[message_line.index(" ") + 1 :] # noqa: E203
message = message.replace("\r", "")

Expand Down
9 changes: 7 additions & 2 deletions airflow_kubernetes_job_operator/kubernetes_job_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import jinja2
import json
from logging import Logger
from typing import List, Union
from airflow.utils.decorators import apply_defaults
from airflow_kubernetes_job_operator.kube_api import KubeResourceState, KubeLogApiEvent
Expand Down Expand Up @@ -366,12 +367,17 @@ def _validate_job_runner(self):
def create_job_runner(self) -> JobRunner:
"""Override this method to create your own or augment the job runner"""
# create the job runner.
logger: Logger = None
if hasattr(self, "logger"):
logger = self.logger
if not isinstance(self.logger, Logger):
logger = logger()
return JobRunner(
body=self.body,
namespace=self.namespace,
show_pod_logs=self.get_logs,
delete_policy=self.delete_policy,
logger=self.logger if hasattr(self, "logger") else None,
logger=logger,
auto_load_kube_config=True,
name_prefix=self._create_kubernetes_job_name_prefix(
self.name_prefix if self.name_prefix is not None else self.task_id
Expand Down Expand Up @@ -454,7 +460,6 @@ def execute(self, context):
)
self._job_is_executing = True
try:

rslt = self.job_runner.execute_job(
watcher_start_timeout=self.startup_timeout_seconds,
timeout=self._internal_wait_kuberentes_object_timeout,
Expand Down
9 changes: 7 additions & 2 deletions examples/dags/test_job_operator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow_kubernetes_job_operator.kubernetes_job_operator import (
KubernetesJobOperator,
)
from airflow.utils.dates import days_ago

default_args = {"owner": "tester", "start_date": days_ago(2), "retries": 0}
dag = DAG(
"job-operator-example", default_args=default_args, description="Test base job operator", schedule_interval=None
"job-operator-example",
default_args=default_args,
description="Test base job operator",
schedule_interval=None,
)


Expand Down
4 changes: 3 additions & 1 deletion experimental/core_tester/test_threaded_kubernetes_watch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import kubernetes
import os
from utils import logging
from airflow_kubernetes_job_operator.threaded_kubernetes_watch import ThreadedKubernetesWatchNamspeace
from airflow_kubernetes_job_operator.threaded_kubernetes_watch import (
ThreadedKubernetesWatchNamspeace,
)

logging.basicConfig(level="INFO")
CUR_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
watcher.on("warning", lambda warning: logging.warning(warning))

try:
for msg in watcher.stream(client=client, name="tester", namespace=current_namespace):
for msg in watcher.stream(
client=client, name="tester", namespace=current_namespace
):
logging.info(msg)
finally:
logging.info("Stopping the namespace reader...")
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def get_version():
author="Zav Shotan",
author_email="",
url="https://github.com/LamaAni/KubernetesJobOperator",
packages=["airflow_kubernetes_job_operator", "airflow_kubernetes_job_operator/kube_api"],
packages=[
"airflow_kubernetes_job_operator",
"airflow_kubernetes_job_operator/kube_api",
],
platforms="any",
license="LICENSE",
install_requires=[
Expand Down
Loading

0 comments on commit d821c97

Please sign in to comment.