From b4fe05a9cc956eb77d9f3dabeab3fbc1ffd50269 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Wed, 11 Dec 2024 10:09:48 -0600 Subject: [PATCH] remove `sync_compatible` from `git_clone` --- src/prefect/_internal/retries.py | 21 +++-- src/prefect/deployments/steps/core.py | 4 +- src/prefect/deployments/steps/pull.py | 71 +++++++++++++--- tests/deployment/test_steps.py | 116 ++++++++++++++++++++++---- 4 files changed, 178 insertions(+), 34 deletions(-) diff --git a/src/prefect/_internal/retries.py b/src/prefect/_internal/retries.py index 097305e3e7c2..08cc21e9a252 100644 --- a/src/prefect/_internal/retries.py +++ b/src/prefect/_internal/retries.py @@ -1,10 +1,15 @@ import asyncio from functools import wraps -from typing import Any, Callable, Tuple, Type +from typing import Callable, Optional, Tuple, Type, TypeVar + +from typing_extensions import ParamSpec from prefect._internal._logging import logger from prefect.utilities.math import clamped_poisson_interval +P = ParamSpec("P") +R = TypeVar("R") + def exponential_backoff_with_jitter( attempt: int, base_delay: float, max_delay: float @@ -21,7 +26,8 @@ def retry_async_fn( base_delay: float = 1, max_delay: float = 10, retry_on_exceptions: Tuple[Type[Exception], ...] = (Exception,), -): + operation_name: Optional[str] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: """A decorator for retrying an async function. Args: @@ -33,23 +39,26 @@ def retry_async_fn( max_delay: The maximum delay to use for the last attempt. retry_on_exceptions: A tuple of exception types to retry on. Defaults to retrying on all exceptions. + operation_name: Optional name to use for logging the operation instead of + the function name. If None, uses the function name. """ - def decorator(func): + def decorator(func: Callable[P, R]) -> Callable[P, R]: @wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + name = operation_name or func.__name__ for attempt in range(max_attempts): try: return await func(*args, **kwargs) except retry_on_exceptions as e: if attempt == max_attempts - 1: logger.exception( - f"Function {func.__name__!r} failed after {max_attempts} attempts" + f"Function {name!r} failed after {max_attempts} attempts" ) raise delay = backoff_strategy(attempt, base_delay, max_delay) logger.warning( - f"Attempt {attempt + 1} of function {func.__name__!r} failed with {type(e).__name__}. " + f"Attempt {attempt + 1} of function {name!r} failed with {type(e).__name__}: {str(e)}. " f"Retrying in {delay:.2f} seconds..." ) await asyncio.sleep(delay) diff --git a/src/prefect/deployments/steps/core.py b/src/prefect/deployments/steps/core.py index 8938ad46a6e6..ef6118b297a9 100644 --- a/src/prefect/deployments/steps/core.py +++ b/src/prefect/deployments/steps/core.py @@ -99,7 +99,9 @@ def _get_function_for_step( return step_func -async def run_step(step: Dict, upstream_outputs: Optional[Dict] = None) -> Dict: +async def run_step( + step: dict[str, Any], upstream_outputs: Optional[dict[str, Any]] = None +) -> dict[str, Any]: """ Runs a step, returns the step's output. diff --git a/src/prefect/deployments/steps/pull.py b/src/prefect/deployments/steps/pull.py index 8f2a82f54cb9..b79c5d28b9e6 100644 --- a/src/prefect/deployments/steps/pull.py +++ b/src/prefect/deployments/steps/pull.py @@ -6,10 +6,12 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Optional +from build.lib.prefect._internal.compatibility.async_dispatch import async_dispatch + from prefect._internal.retries import retry_async_fn from prefect.logging.loggers import get_logger from prefect.runner.storage import BlockStorageAdapter, GitRepository, RemoteStorage -from prefect.utilities.asyncutils import sync_compatible +from prefect.utilities.asyncutils import run_coro_as_sync deployment_logger = get_logger("deployment") @@ -17,7 +19,7 @@ from prefect.blocks.core import Block -def set_working_directory(directory: str) -> dict: +def set_working_directory(directory: str) -> dict[str, str]: """ Sets the working directory; works with both absolute and relative paths. @@ -37,15 +39,64 @@ def set_working_directory(directory: str) -> dict: base_delay=1, max_delay=10, retry_on_exceptions=(RuntimeError,), + operation_name="git_clone", ) -@sync_compatible -async def git_clone( +async def _pull_git_repository_with_retries(repo: GitRepository): + await repo.pull_code() + + +async def agit_clone( repository: str, branch: Optional[str] = None, include_submodules: bool = False, access_token: Optional[str] = None, credentials: Optional["Block"] = None, -) -> dict: +) -> dict[str, str]: + """ + Asynchronously clones a git repository into the current working directory. + + Args: + repository: the URL of the repository to clone + branch: the branch to clone; if not provided, the default branch will be used + include_submodules (bool): whether to include git submodules when cloning the repository + access_token: an access token to use for cloning the repository; if not provided + the repository will be cloned using the default git credentials + credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the + credentials to use for cloning the repository. + + Returns: + dict: a dictionary containing a `directory` key of the new directory that was created + + Raises: + subprocess.CalledProcessError: if the git clone command fails for any reason + """ + if access_token and credentials: + raise ValueError( + "Please provide either an access token or credentials but not both." + ) + + _credentials = {"access_token": access_token} if access_token else credentials + + storage = GitRepository( + url=repository, + credentials=_credentials, + branch=branch, + include_submodules=include_submodules, + ) + + await _pull_git_repository_with_retries(storage) + + return dict(directory=str(storage.destination.relative_to(Path.cwd()))) + + +@async_dispatch(agit_clone) +def git_clone( + repository: str, + branch: Optional[str] = None, + include_submodules: bool = False, + access_token: Optional[str] = None, + credentials: Optional["Block"] = None, +) -> dict[str, str]: """ Clones a git repository into the current working directory. @@ -120,20 +171,18 @@ async def git_clone( "Please provide either an access token or credentials but not both." ) - credentials = {"access_token": access_token} if access_token else credentials + _credentials = {"access_token": access_token} if access_token else credentials storage = GitRepository( url=repository, - credentials=credentials, + credentials=_credentials, branch=branch, include_submodules=include_submodules, ) - await storage.pull_code() + run_coro_as_sync(_pull_git_repository_with_retries(storage)) - directory = str(storage.destination.relative_to(Path.cwd())) - deployment_logger.info(f"Cloned repository {repository!r} into {directory!r}") - return {"directory": directory} + return dict(directory=str(storage.destination.relative_to(Path.cwd()))) async def pull_from_remote_storage(url: str, **settings: Any): diff --git a/tests/deployment/test_steps.py b/tests/deployment/test_steps.py index 9da22636da35..69eeb3bc5a21 100644 --- a/tests/deployment/test_steps.py +++ b/tests/deployment/test_steps.py @@ -15,6 +15,7 @@ from prefect.client.orchestration import PrefectClient from prefect.deployments.steps import run_step from prefect.deployments.steps.core import StepExecutionError, run_steps +from prefect.deployments.steps.pull import agit_clone from prefect.deployments.steps.utility import run_shell_script from prefect.testing.utilities import AsyncMock, MagicMock from prefect.utilities.filesystem import tmpchdir @@ -518,22 +519,10 @@ async def mock_sleep(seconds): monkeypatch.setattr("asyncio.sleep", mock_sleep) with caplog.at_level("WARNING"): - result = await run_step( - { - "prefect.deployments.steps.git_clone": { - "repository": "https://github.com/org/repo.git" - } - } - ) + result = await agit_clone(repository="https://github.com/org/repo.git") - assert ( - "Attempt 1 of function 'git_clone' failed with RuntimeError. Retrying in " - in caplog.text - ) - assert ( - "Attempt 2 of function 'git_clone' failed with RuntimeError. Retrying in " - in caplog.text - ) + assert "Octocat went out to lunch" in caplog.text + assert "Octocat is playing chess in the break room" in caplog.text assert result == {"directory": "repo"} @@ -544,7 +533,102 @@ async def mock_sleep(seconds): include_submodules=False, ) - assert mock_git_repo.call_args_list == [expected_call] * 3 + assert mock_git_repo.call_args_list == [expected_call] + + async def test_agit_clone_basic(self, git_repository_mock): + """Test basic async git clone functionality""" + output = await agit_clone(repository="https://github.com/org/repo.git") + + assert output["directory"] == "repo" + git_repository_mock.assert_called_once_with( + url="https://github.com/org/repo.git", + credentials=None, + branch=None, + include_submodules=False, + ) + git_repository_mock.return_value.pull_code.assert_awaited_once() + + async def test_agit_clone_with_all_options(self, git_repository_mock): + """Test async git clone with all options specified""" + await Secret(value="my-access-token").save(name="test-token") + + output = await agit_clone( + repository="https://github.com/org/repo.git", + branch="dev", + include_submodules=True, + access_token="my-access-token", + ) + + assert output["directory"] == "repo" + git_repository_mock.assert_called_once_with( + url="https://github.com/org/repo.git", + credentials={"access_token": "my-access-token"}, + branch="dev", + include_submodules=True, + ) + git_repository_mock.return_value.pull_code.assert_awaited_once() + + async def test_agit_clone_with_credentials_block(self, git_repository_mock): + """Test async git clone with credentials block""" + + class MockGitCredentials(Block): + username: str + password: str + + creds = MockGitCredentials(username="marvin42", password="hunter2") + + output = await agit_clone( + repository="https://github.com/org/repo.git", credentials=creds + ) + + assert output["directory"] == "repo" + git_repository_mock.assert_called_once_with( + url="https://github.com/org/repo.git", + credentials=creds, + branch=None, + include_submodules=False, + ) + git_repository_mock.return_value.pull_code.assert_awaited_once() + + async def test_agit_clone_raises_on_both_auth_methods(self): + """Test that providing both access_token and credentials raises an error""" + with pytest.raises( + ValueError, + match="Please provide either an access token or credentials but not both", + ): + await agit_clone( + repository="https://github.com/org/repo.git", + access_token="token", + credentials=MagicMock(), + ) + + async def test_agit_clone_retry(self, monkeypatch, caplog): + """Test retry behavior of async git clone""" + mock_git_repo = MagicMock() + mock_git_repo.return_value.pull_code = AsyncMock( + side_effect=[ + RuntimeError("Network timeout"), + RuntimeError("Server busy"), + None, # Success on third try + ] + ) + mock_git_repo.return_value.destination.relative_to.return_value = "repo" + monkeypatch.setattr( + "prefect.deployments.steps.pull.GitRepository", mock_git_repo + ) + + async def mock_sleep(seconds): + pass + + monkeypatch.setattr("asyncio.sleep", mock_sleep) + + with caplog.at_level("WARNING"): + result = await agit_clone(repository="https://github.com/org/repo.git") + + assert result == {"directory": "repo"} + assert mock_git_repo.return_value.pull_code.await_count == 3 + assert "Network timeout" in caplog.text + assert "Server busy" in caplog.text class TestPullFromRemoteStorage: