Skip to content

Commit

Permalink
remove sync_compatible from git_clone
Browse files Browse the repository at this point in the history
da heck
  • Loading branch information
zzstoatzz committed Dec 11, 2024
1 parent 686d4c3 commit d5fc978
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 34 deletions.
21 changes: 15 additions & 6 deletions src/prefect/_internal/retries.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import asyncio
from functools import wraps
from typing import Any, Callable, Tuple, Type
from typing import Callable, Optional, Tuple, Type, TypeVar

from typing_extensions import ParamSpec

from prefect._internal._logging import logger
from prefect.utilities.math import clamped_poisson_interval

P = ParamSpec("P")
R = TypeVar("R")


def exponential_backoff_with_jitter(
attempt: int, base_delay: float, max_delay: float
Expand All @@ -21,7 +26,8 @@ def retry_async_fn(
base_delay: float = 1,
max_delay: float = 10,
retry_on_exceptions: Tuple[Type[Exception], ...] = (Exception,),
):
operation_name: Optional[str] = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""A decorator for retrying an async function.
Args:
Expand All @@ -33,23 +39,26 @@ def retry_async_fn(
max_delay: The maximum delay to use for the last attempt.
retry_on_exceptions: A tuple of exception types to retry on. Defaults to
retrying on all exceptions.
operation_name: Optional name to use for logging the operation instead of
the function name. If None, uses the function name.
"""

def decorator(func):
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
name = operation_name or func.__name__
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except retry_on_exceptions as e:
if attempt == max_attempts - 1:
logger.exception(
f"Function {func.__name__!r} failed after {max_attempts} attempts"
f"Function {name!r} failed after {max_attempts} attempts"
)
raise
delay = backoff_strategy(attempt, base_delay, max_delay)
logger.warning(
f"Attempt {attempt + 1} of function {func.__name__!r} failed with {type(e).__name__}. "
f"Attempt {attempt + 1} of function {name!r} failed with {type(e).__name__}: {str(e)}. "
f"Retrying in {delay:.2f} seconds..."
)
await asyncio.sleep(delay)
Expand Down
4 changes: 3 additions & 1 deletion src/prefect/deployments/steps/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def _get_function_for_step(
return step_func


async def run_step(step: Dict, upstream_outputs: Optional[Dict] = None) -> Dict:
async def run_step(
step: dict[str, Any], upstream_outputs: Optional[dict[str, Any]] = None
) -> dict[str, Any]:
"""
Runs a step, returns the step's output.
Expand Down
70 changes: 59 additions & 11 deletions src/prefect/deployments/steps/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional

from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect._internal.retries import retry_async_fn
from prefect.logging.loggers import get_logger
from prefect.runner.storage import BlockStorageAdapter, GitRepository, RemoteStorage
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.asyncutils import run_coro_as_sync

deployment_logger = get_logger("deployment")

if TYPE_CHECKING:
from prefect.blocks.core import Block


def set_working_directory(directory: str) -> dict:
def set_working_directory(directory: str) -> dict[str, str]:
"""
Sets the working directory; works with both absolute and relative paths.
Expand All @@ -37,15 +38,64 @@ def set_working_directory(directory: str) -> dict:
base_delay=1,
max_delay=10,
retry_on_exceptions=(RuntimeError,),
operation_name="git_clone",
)
@sync_compatible
async def git_clone(
async def _pull_git_repository_with_retries(repo: GitRepository):
await repo.pull_code()


async def agit_clone(
repository: str,
branch: Optional[str] = None,
include_submodules: bool = False,
access_token: Optional[str] = None,
credentials: Optional["Block"] = None,
) -> dict[str, str]:
"""
Asynchronously clones a git repository into the current working directory.
Args:
repository: the URL of the repository to clone
branch: the branch to clone; if not provided, the default branch will be used
include_submodules (bool): whether to include git submodules when cloning the repository
access_token: an access token to use for cloning the repository; if not provided
the repository will be cloned using the default git credentials
credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the
credentials to use for cloning the repository.
Returns:
dict: a dictionary containing a `directory` key of the new directory that was created
Raises:
subprocess.CalledProcessError: if the git clone command fails for any reason
"""
if access_token and credentials:
raise ValueError(
"Please provide either an access token or credentials but not both."
)

_credentials = {"access_token": access_token} if access_token else credentials

storage = GitRepository(
url=repository,
credentials=_credentials,
branch=branch,
include_submodules=include_submodules,
)

await _pull_git_repository_with_retries(storage)

return dict(directory=str(storage.destination.relative_to(Path.cwd())))


@async_dispatch(agit_clone)
def git_clone(
repository: str,
branch: Optional[str] = None,
include_submodules: bool = False,
access_token: Optional[str] = None,
credentials: Optional["Block"] = None,
) -> dict:
) -> dict[str, str]:
"""
Clones a git repository into the current working directory.
Expand Down Expand Up @@ -120,20 +170,18 @@ async def git_clone(
"Please provide either an access token or credentials but not both."
)

credentials = {"access_token": access_token} if access_token else credentials
_credentials = {"access_token": access_token} if access_token else credentials

storage = GitRepository(
url=repository,
credentials=credentials,
credentials=_credentials,
branch=branch,
include_submodules=include_submodules,
)

await storage.pull_code()
run_coro_as_sync(_pull_git_repository_with_retries(storage))

directory = str(storage.destination.relative_to(Path.cwd()))
deployment_logger.info(f"Cloned repository {repository!r} into {directory!r}")
return {"directory": directory}
return dict(directory=str(storage.destination.relative_to(Path.cwd())))


async def pull_from_remote_storage(url: str, **settings: Any):
Expand Down
116 changes: 100 additions & 16 deletions tests/deployment/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from prefect.client.orchestration import PrefectClient
from prefect.deployments.steps import run_step
from prefect.deployments.steps.core import StepExecutionError, run_steps
from prefect.deployments.steps.pull import agit_clone
from prefect.deployments.steps.utility import run_shell_script
from prefect.testing.utilities import AsyncMock, MagicMock
from prefect.utilities.filesystem import tmpchdir
Expand Down Expand Up @@ -518,22 +519,10 @@ async def mock_sleep(seconds):
monkeypatch.setattr("asyncio.sleep", mock_sleep)

with caplog.at_level("WARNING"):
result = await run_step(
{
"prefect.deployments.steps.git_clone": {
"repository": "https://github.com/org/repo.git"
}
}
)
result = await agit_clone(repository="https://github.com/org/repo.git")

assert (
"Attempt 1 of function 'git_clone' failed with RuntimeError. Retrying in "
in caplog.text
)
assert (
"Attempt 2 of function 'git_clone' failed with RuntimeError. Retrying in "
in caplog.text
)
assert "Octocat went out to lunch" in caplog.text
assert "Octocat is playing chess in the break room" in caplog.text

assert result == {"directory": "repo"}

Expand All @@ -544,7 +533,102 @@ async def mock_sleep(seconds):
include_submodules=False,
)

assert mock_git_repo.call_args_list == [expected_call] * 3
assert mock_git_repo.call_args_list == [expected_call]

async def test_agit_clone_basic(self, git_repository_mock):
"""Test basic async git clone functionality"""
output = await agit_clone(repository="https://github.com/org/repo.git")

assert output["directory"] == "repo"
git_repository_mock.assert_called_once_with(
url="https://github.com/org/repo.git",
credentials=None,
branch=None,
include_submodules=False,
)
git_repository_mock.return_value.pull_code.assert_awaited_once()

async def test_agit_clone_with_all_options(self, git_repository_mock):
"""Test async git clone with all options specified"""
await Secret(value="my-access-token").save(name="test-token")

output = await agit_clone(
repository="https://github.com/org/repo.git",
branch="dev",
include_submodules=True,
access_token="my-access-token",
)

assert output["directory"] == "repo"
git_repository_mock.assert_called_once_with(
url="https://github.com/org/repo.git",
credentials={"access_token": "my-access-token"},
branch="dev",
include_submodules=True,
)
git_repository_mock.return_value.pull_code.assert_awaited_once()

async def test_agit_clone_with_credentials_block(self, git_repository_mock):
"""Test async git clone with credentials block"""

class MockGitCredentials(Block):
username: str
password: str

creds = MockGitCredentials(username="marvin42", password="hunter2")

output = await agit_clone(
repository="https://github.com/org/repo.git", credentials=creds
)

assert output["directory"] == "repo"
git_repository_mock.assert_called_once_with(
url="https://github.com/org/repo.git",
credentials=creds,
branch=None,
include_submodules=False,
)
git_repository_mock.return_value.pull_code.assert_awaited_once()

async def test_agit_clone_raises_on_both_auth_methods(self):
"""Test that providing both access_token and credentials raises an error"""
with pytest.raises(
ValueError,
match="Please provide either an access token or credentials but not both",
):
await agit_clone(
repository="https://github.com/org/repo.git",
access_token="token",
credentials=MagicMock(),
)

async def test_agit_clone_retry(self, monkeypatch, caplog):
"""Test retry behavior of async git clone"""
mock_git_repo = MagicMock()
mock_git_repo.return_value.pull_code = AsyncMock(
side_effect=[
RuntimeError("Network timeout"),
RuntimeError("Server busy"),
None, # Success on third try
]
)
mock_git_repo.return_value.destination.relative_to.return_value = "repo"
monkeypatch.setattr(
"prefect.deployments.steps.pull.GitRepository", mock_git_repo
)

async def mock_sleep(seconds):
pass

monkeypatch.setattr("asyncio.sleep", mock_sleep)

with caplog.at_level("WARNING"):
result = await agit_clone(repository="https://github.com/org/repo.git")

assert result == {"directory": "repo"}
assert mock_git_repo.return_value.pull_code.await_count == 3
assert "Network timeout" in caplog.text
assert "Server busy" in caplog.text


class TestPullFromRemoteStorage:
Expand Down

0 comments on commit d5fc978

Please sign in to comment.