Skip to content

Commit

Permalink
Merge pull request #81 from LamaAni/throw_general_errors
Browse files Browse the repository at this point in the history
Error while throwing general connection errors through the event service
  • Loading branch information
LamaAni authored Dec 19, 2022
2 parents 4604e4a + 90f6c2d commit f0ccada
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions airflow_kubernetes_job_operator/kube_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __init__(
"""
assert use_asyncio is not True, NotImplementedError("AsyncIO not yet implemented.")
super().__init__(
self._exdcute_query,
self._execute_query,
use_async_loop=use_asyncio or KubeApiRestQuery.default_use_asyncio,
use_daemon_thread=True,
thread_name=f"{self.__class__.__name__} {id(self)}",
Expand Down Expand Up @@ -221,18 +221,20 @@ def on_reconnect(self, client: "KubeApiRestClient"):
"""
return True

def _exdcute_query(self, client: "KubeApiRestClient"):
def _execute_query(self, client: "KubeApiRestClient"):
self._set_connection_state(KubeApiRestQueryConnectionState.Disconnected, False)
self.emit(self.query_started_event_name, self, client)
self.pre_request(client)

try:
self.query_loop(client)
self.post_request(client)
except Exception as ex:
self.emit_error(ex)
raise ex
finally:
self._set_connection_state(KubeApiRestQueryConnectionState.Disconnected)

self.post_request(client)
self.emit(self.query_ended_event_name, self, client)
self.emit(self.query_ended_event_name, self, client)

def query_loop(self, client: "KubeApiRestClient"):
"""Overridable. The main query loop. Called to execute the query.
Expand Down Expand Up @@ -605,16 +607,18 @@ def _create_query_handler(self, queries: List[KubeApiRestQuery]) -> EventHandler

pending = set(queries)

def remove_from_pending(q):
def remove_from_pending(q, ex: Exception = None):
if q in pending:
pending.remove(q)
if ex:
handler.emit_error(ex)
if len(pending) == 0:
handler.stop_all_streams()

for q in queries:
self._active_queries.add(q)
q.on(q.query_ended_event_name, lambda query, client: remove_from_pending(query))
q.on(q.error_event_name, lambda query, err: remove_from_pending(query))
q.on(q.error_event_name, lambda query, err: remove_from_pending(query, err))
q.pipe(handler)

return handler
Expand Down

0 comments on commit f0ccada

Please sign in to comment.