Skip to content

Commit

Permalink
Catch up with main
Browse files Browse the repository at this point in the history
  • Loading branch information
bunchesofdonald committed Jul 26, 2024
1 parent 26ad79d commit e57689e
Showing 1 changed file with 95 additions and 57 deletions.
152 changes: 95 additions & 57 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,40 +123,6 @@ def state(self) -> State:
raise ValueError("Task run is not set")
return self.task_run.state

def can_retry(self, exc: Exception) -> bool:
retry_condition: Optional[
Callable[[Task[P, Coroutine[Any, Any, R]], TaskRun, State], bool]
] = self.task.retry_condition_fn
if not self.task_run:
raise ValueError("Task run is not set")
try:
self.logger.debug(
f"Running `retry_condition_fn` check {retry_condition!r} for task"
f" {self.task.name!r}"
)
state = Failed(
data=exc,
message=f"Task run encountered unexpected exception: {repr(exc)}",
)
if inspect.iscoroutinefunction(retry_condition):
should_retry = run_coro_as_sync(
retry_condition(self.task, self.task_run, state)
)
elif inspect.isfunction(retry_condition):
should_retry = retry_condition(self.task, self.task_run, state)
else:
should_retry = not retry_condition
return should_retry
except Exception:
self.logger.error(
(
"An error was encountered while running `retry_condition_fn` check"
f" '{retry_condition!r}' for task {self.task.name!r}"
),
exc_info=True,
)
return False

def is_cancelled(self) -> bool:
if (
self.context
Expand Down Expand Up @@ -255,21 +221,21 @@ def log_finished_message(self):
msg += dedent(
"""
Example:
Example:
from prefect import flow, task
from prefect import flow, task
@task
def say_hello(name):
print f"Hello, {name}!"
@task
def say_hello(name):
print f"Hello, {name}!"
@flow
def example_flow():
future = say_hello.submit(name="Marvin)
future.wait()
@flow
def example_flow():
future = say_hello.submit(name="Marvin)
future.wait()
example_flow()
"""
example_flow()
"""
)
self.logger.log(
level=level,
Expand All @@ -287,6 +253,40 @@ def client(self) -> SyncPrefectClient:
raise RuntimeError("Engine has not started.")
return self._client

def can_retry(self, exc: Exception) -> bool:
retry_condition: Optional[
Callable[[Task[P, Coroutine[Any, Any, R]], TaskRun, State], bool]
] = self.task.retry_condition_fn
if not self.task_run:
raise ValueError("Task run is not set")
try:
self.logger.debug(
f"Running `retry_condition_fn` check {retry_condition!r} for task"
f" {self.task.name!r}"
)
state = Failed(
data=exc,
message=f"Task run encountered unexpected exception: {repr(exc)}",
)
if inspect.iscoroutinefunction(retry_condition):
should_retry = run_coro_as_sync(
retry_condition(self.task, self.task_run, state)
)
elif inspect.isfunction(retry_condition):
should_retry = retry_condition(self.task, self.task_run, state)
else:
should_retry = not retry_condition
return should_retry
except Exception:
self.logger.error(
(
"An error was encountered while running `retry_condition_fn` check"
f" '{retry_condition!r}' for task {self.task.name!r}"
),
exc_info=True,
)
return False

def call_hooks(self, state: Optional[State] = None):
if state is None:
state = self.state
Expand Down Expand Up @@ -798,6 +798,39 @@ def client(self) -> PrefectClient:
raise RuntimeError("Engine has not started.")
return self._client

async def can_retry(self, exc: Exception) -> bool:
retry_condition: Optional[
Callable[[Task[P, Coroutine[Any, Any, R]], TaskRun, State], bool]
] = self.task.retry_condition_fn
if not self.task_run:
raise ValueError("Task run is not set")
try:
self.logger.debug(
f"Running `retry_condition_fn` check {retry_condition!r} for task"
f" {self.task.name!r}"
)
state = Failed(
data=exc,
message=f"Task run encountered unexpected exception: {repr(exc)}",
)
if inspect.iscoroutinefunction(retry_condition):
should_retry = await retry_condition(self.task, self.task_run, state)
elif inspect.isfunction(retry_condition):
should_retry = retry_condition(self.task, self.task_run, state)
else:
should_retry = not retry_condition
return should_retry

except Exception:
self.logger.error(
(
"An error was encountered while running `retry_condition_fn` check"
f" '{retry_condition!r}' for task {self.task.name!r}"
),
exc_info=True,
)
return False

async def call_hooks(self, state: Optional[State] = None):
if state is None:
state = self.state
Expand Down Expand Up @@ -991,7 +1024,7 @@ async def handle_retry(self, exc: Exception) -> bool:
- If the task has a retry delay, place in AwaitingRetry state with a delayed scheduled time.
- If the task has no retries left, or the retry condition is not met, return False.
"""
if self.retries < self.task.retries and self.can_retry(exc):
if self.retries < self.task.retries and await self.can_retry(exc):
if self.task.retry_delay_seconds:
delay = (
self.task.retry_delay_seconds[
Expand Down Expand Up @@ -1129,19 +1162,23 @@ async def initialize_run(
self._client = client_ctx.client
self._is_started = True
try:
if not self.task_run:
if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION:
# TODO - this maybe should be a method on Task?
from prefect.utilities.engine import (
_resolve_custom_task_run_name,
if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION:
from prefect.utilities.engine import (
_resolve_custom_task_run_name,
)

task_run_name = (
_resolve_custom_task_run_name(
task=self.task, parameters=self.parameters
)
if self.task.task_run_name
else None
)

task_run_name = None
if not self._task_name_set and self.task.task_run_name:
task_run_name = _resolve_custom_task_run_name(
task=self.task, parameters=self.parameters
)
if self.task_run and task_run_name:
self.task_run.name = task_run_name

if not self.task_run:
self.task_run = await self.task.create_local_run(
id=task_run_id,
parameters=self.parameters,
Expand All @@ -1151,7 +1188,8 @@ async def initialize_run(
extra_task_inputs=dependencies,
task_run_name=task_run_name,
)
else:
else:
if not self.task_run:
self.task_run = await self.task.create_run(
id=task_run_id,
parameters=self.parameters,
Expand Down

0 comments on commit e57689e

Please sign in to comment.