Skip to content

Commit

Permalink
Merge branch 'main' into populate-default-profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jul 29, 2024
2 parents 15f2bd5 + 85bea69 commit a58aee7
Show file tree
Hide file tree
Showing 31 changed files with 1,661 additions and 411 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/api-compatibility-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ concurrency:
jobs:
compatibility-tests:

if: false # temporarily pausing compat tests on OSS

timeout-minutes: 10

runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codspeed-benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
# https://github.com/PrefectHQ/prefect/issues/6990
- name: Run benchmarks
uses: CodSpeedHQ/action@v2
uses: CodSpeedHQ/action@v3
env:
PREFECT_API_URL: "http://127.0.0.1:4200/api"
with:
Expand Down
10 changes: 10 additions & 0 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -12600,6 +12600,11 @@
}
],
"title": "Occupancy Seconds"
},
"create_if_missing": {
"type": "boolean",
"title": "Create If Missing",
"default": true
}
},
"type": "object",
Expand Down Expand Up @@ -12632,6 +12637,11 @@
],
"title": "Mode",
"default": "concurrency"
},
"create_if_missing": {
"type": "boolean",
"title": "Create If Missing",
"default": true
}
},
"type": "object",
Expand Down
20 changes: 20 additions & 0 deletions src/integrations/prefect-dask/tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from prefect_dask import DaskTaskRunner

from prefect import flow, task
from prefect.futures import as_completed
from prefect.server.schemas.states import StateType
from prefect.states import State
from prefect.testing.fixtures import ( # noqa: F401
Expand Down Expand Up @@ -248,6 +249,25 @@ def test_flow():
assert bx.type == StateType.PENDING
assert cx.type == StateType.COMPLETED

def test_as_completed_yields_correct_order(self, task_runner):
@task
def sleep_task(seconds):
time.sleep(seconds)
return seconds

timings = [1, 5, 10]
with task_runner:
done_futures = []
futures = [
task_runner.submit(
sleep_task, parameters={"seconds": seconds}, wait_for=[]
)
for seconds in reversed(timings)
]
for future in as_completed(futures=futures):
done_futures.append(future.result())
assert done_futures == timings

async def test_wait_captures_exceptions_as_crashed_state(self, task_runner):
"""
Dask wraps the exception, interrupts will result in "Cancelled" tasks
Expand Down
11 changes: 11 additions & 0 deletions src/integrations/prefect-ray/prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def add_done_callback(self, fn):
if not self._final_state:

def call_with_self(future):
"""Call the callback with self as the argument, this is necessary to ensure we remove the future from the pending set"""
fn(self)

self._wrapped_future._on_completed(call_with_self)
return
fn(self)

def __del__(self):
if self._final_state:
return
Expand Down
19 changes: 19 additions & 0 deletions src/integrations/prefect-ray/tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import prefect.task_engine
import tests
from prefect import flow, task
from prefect.futures import as_completed
from prefect.states import State, StateType
from prefect.testing.fixtures import ( # noqa: F401
hosted_api_server,
Expand Down Expand Up @@ -371,6 +372,24 @@ def test_flow():
assert bx.type == StateType.PENDING
assert cx.type == StateType.COMPLETED

def test_as_completed_yields_correct_order(self, task_runner):
@task
def task_a(seconds):
time.sleep(seconds)
return seconds

timings = [1, 5, 10]

@flow(version="test", task_runner=task_runner)
def test_flow():
done_futures = []
futures = [task_a.submit(seconds) for seconds in reversed(timings)]
for future in as_completed(futures=futures):
done_futures.append(future.result())
assert done_futures[-1] == timings[-1]

test_flow()

def get_sleep_time(self) -> float:
"""
Return an amount of time to sleep for concurrency tests.
Expand Down
29 changes: 26 additions & 3 deletions src/prefect/blocks/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,37 @@ class MicrosoftTeamsWebhook(AppriseNotificationBlock):
_documentation_url = "https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.MicrosoftTeamsWebhook"

url: SecretStr = Field(
...,
default=...,
title="Webhook URL",
description="The Teams incoming webhook URL used to send notifications.",
description="The Microsoft Power Automate (Workflows) URL used to send notifications to Teams.",
examples=[
"https://your-org.webhook.office.com/webhookb2/XXX/IncomingWebhook/YYY/ZZZ"
"https://prod-NO.LOCATION.logic.azure.com:443/workflows/WFID/triggers/manual/paths/invoke?api-version=2016-06-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=SIGNATURE"
],
)

include_image: bool = Field(
default=True,
description="Include an image with the notification.",
)

wrap: bool = Field(
default=True,
description="Wrap the notification text.",
)

def block_initialization(self) -> None:
"""see https://github.com/caronc/apprise/pull/1172"""
from apprise.plugins.workflows import NotifyWorkflows

if not (
parsed_url := NotifyWorkflows.parse_native_url(self.url.get_secret_value())
):
raise ValueError("Invalid Microsoft Teams Workflow URL provided.")

parsed_url |= {"include_image": self.include_image, "wrap": self.wrap}

self._start_apprise_client(SecretStr(NotifyWorkflows(**parsed_url).url()))


class PagerDutyWebHook(AbstractAppriseNotificationBlock):
"""
Expand Down
35 changes: 19 additions & 16 deletions src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,18 @@ def get_client(
except RuntimeError:
loop = None

if client_ctx := prefect.context.ClientContext.get():
if (
sync_client
and client_ctx.sync_client
and client_ctx._httpx_settings == httpx_settings
):
return client_ctx.sync_client
elif (
not sync_client
and client_ctx.async_client
and client_ctx._httpx_settings == httpx_settings
and loop in (client_ctx.async_client._loop, None)
):
return client_ctx.async_client
if sync_client:
if client_ctx := prefect.context.SyncClientContext.get():
if client_ctx.client and client_ctx._httpx_settings == httpx_settings:
return client_ctx.client
else:
if client_ctx := prefect.context.AsyncClientContext.get():
if (
client_ctx.client
and client_ctx._httpx_settings == httpx_settings
and loop in (client_ctx.client._loop, None)
):
return client_ctx.client

api = PREFECT_API_URL.value()

Expand Down Expand Up @@ -3012,11 +3010,16 @@ async def read_worker_metadata(self) -> Dict[str, Any]:
return response.json()

async def increment_concurrency_slots(
self, names: List[str], slots: int, mode: str
self, names: List[str], slots: int, mode: str, create_if_missing: Optional[bool]
) -> httpx.Response:
return await self._client.post(
"/v2/concurrency_limits/increment",
json={"names": names, "slots": slots, "mode": mode},
json={
"names": names,
"slots": slots,
"mode": mode,
"create_if_missing": create_if_missing,
},
)

async def release_concurrency_slots(
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/client/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ def get_or_create_client(
if client is not None:
return client, True
from prefect._internal.concurrency.event_loop import get_running_loop
from prefect.context import ClientContext, FlowRunContext, TaskRunContext
from prefect.context import AsyncClientContext, FlowRunContext, TaskRunContext

client_context = ClientContext.get()
async_client_context = AsyncClientContext.get()
flow_run_context = FlowRunContext.get()
task_run_context = TaskRunContext.get()

if client_context and client_context.async_client._loop == get_running_loop():
return client_context.async_client, True
if async_client_context and async_client_context.client._loop == get_running_loop():
return async_client_context.client, True
elif (
flow_run_context
and getattr(flow_run_context.client, "_loop", None) == get_running_loop()
Expand Down
28 changes: 24 additions & 4 deletions src/prefect/concurrency/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async def concurrency(
names: Union[str, List[str]],
occupy: int = 1,
timeout_seconds: Optional[float] = None,
create_if_missing: Optional[bool] = True,
) -> AsyncGenerator[None, None]:
"""A context manager that acquires and releases concurrency slots from the
given concurrency limits.
Expand All @@ -43,6 +44,7 @@ async def concurrency(
occupy: The number of slots to acquire and hold from each limit.
timeout_seconds: The number of seconds to wait for the slots to be acquired before
raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
create_if_missing: Whether to create the concurrency limits if they do not exist.
Raises:
TimeoutError: If the slots are not acquired within the given timeout.
Expand All @@ -62,7 +64,10 @@ async def main():
"""
names = names if isinstance(names, list) else [names]
limits = await _acquire_concurrency_slots(
names, occupy, timeout_seconds=timeout_seconds
names,
occupy,
timeout_seconds=timeout_seconds,
create_if_missing=create_if_missing,
)
acquisition_time = pendulum.now("UTC")
emitted_events = _emit_concurrency_acquisition_events(limits, occupy)
Expand All @@ -77,17 +82,31 @@ async def main():
_emit_concurrency_release_events(limits, occupy, emitted_events)


async def rate_limit(names: Union[str, List[str]], occupy: int = 1) -> None:
async def rate_limit(
names: Union[str, List[str]],
occupy: int = 1,
timeout_seconds: Optional[float] = None,
create_if_missing: Optional[bool] = True,
) -> None:
"""Block execution until an `occupy` number of slots of the concurrency
limits given in `names` are acquired. Requires that all given concurrency
limits have a slot decay.
Args:
names: The names of the concurrency limits to acquire slots from.
occupy: The number of slots to acquire and hold from each limit.
timeout_seconds: The number of seconds to wait for the slots to be acquired before
raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
create_if_missing: Whether to create the concurrency limits if they do not exist.
"""
names = names if isinstance(names, list) else [names]
limits = await _acquire_concurrency_slots(names, occupy, mode="rate_limit")
limits = await _acquire_concurrency_slots(
names,
occupy,
mode="rate_limit",
timeout_seconds=timeout_seconds,
create_if_missing=create_if_missing,
)
_emit_concurrency_acquisition_events(limits, occupy)


Expand All @@ -96,9 +115,10 @@ async def _acquire_concurrency_slots(
slots: int,
mode: Union[Literal["concurrency"], Literal["rate_limit"]] = "concurrency",
timeout_seconds: Optional[float] = None,
create_if_missing: Optional[bool] = True,
) -> List[MinimalConcurrencyLimitResponse]:
service = ConcurrencySlotAcquisitionService.instance(frozenset(names))
future = service.send((slots, mode, timeout_seconds))
future = service.send((slots, mode, timeout_seconds, create_if_missing))
response_or_exception = await asyncio.wrap_future(future)

if isinstance(response_or_exception, Exception):
Expand Down
32 changes: 24 additions & 8 deletions src/prefect/concurrency/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ async def _lifespan(self) -> AsyncGenerator[None, None]:
yield

async def _handle(
self, item: Tuple[int, str, Optional[float], concurrent.futures.Future]
self,
item: Tuple[
int, str, Optional[float], concurrent.futures.Future, Optional[bool]
],
) -> None:
occupy, mode, timeout_seconds, future = item
occupy, mode, timeout_seconds, future, create_if_missing = item
try:
response = await self.acquire_slots(occupy, mode, timeout_seconds)
response = await self.acquire_slots(
occupy, mode, timeout_seconds, create_if_missing
)
except Exception as exc:
# If the request to the increment endpoint fails in a non-standard
# way, we need to set the future's result so that the caller can
Expand All @@ -49,13 +54,20 @@ async def _handle(
future.set_result(response)

async def acquire_slots(
self, slots: int, mode: str, timeout_seconds: Optional[float] = None
self,
slots: int,
mode: str,
timeout_seconds: Optional[float] = None,
create_if_missing: Optional[bool] = False,
) -> httpx.Response:
with timeout_async(seconds=timeout_seconds):
while True:
try:
response = await self._client.increment_concurrency_slots(
names=self.concurrency_limit_names, slots=slots, mode=mode
names=self.concurrency_limit_names,
slots=slots,
mode=mode,
create_if_missing=create_if_missing,
)
except Exception as exc:
if (
Expand All @@ -69,15 +81,19 @@ async def acquire_slots(
else:
return response

def send(self, item: Tuple[int, str, Optional[float]]) -> concurrent.futures.Future:
def send(
self, item: Tuple[int, str, Optional[float], Optional[bool]]
) -> concurrent.futures.Future:
with self._lock:
if self._stopped:
raise RuntimeError("Cannot put items in a stopped service instance.")

logger.debug("Service %r enqueuing item %r", self, item)
future: concurrent.futures.Future = concurrent.futures.Future()

occupy, mode, timeout_seconds = item
self._queue.put_nowait((occupy, mode, timeout_seconds, future))
occupy, mode, timeout_seconds, create_if_missing = item
self._queue.put_nowait(
(occupy, mode, timeout_seconds, future, create_if_missing)
)

return future
Loading

0 comments on commit a58aee7

Please sign in to comment.