Skip to content

Commit

Permalink
Merge branch 'main' into logs-csv-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
pleek91 authored Jul 23, 2024
2 parents 6b7654f + e76351e commit 317b078
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 218 deletions.
135 changes: 0 additions & 135 deletions .github/workflows/python-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -219,141 +219,6 @@ jobs:
env:
SLACK_WEBHOOK_URL: ${{ secrets.ENGINEERING_REVIEW_SLACK_WEBHOOK_URL }}

run-tests-for-datadog:
name: DataDog CI Visibility
if: github.event_name == 'push' && (github.ref == 'refs/heads/main' || contains(github.event.head_commit.message, '.github/workflows/python-tests.yaml'))
runs-on:
group: oss-larger-runners
strategy:
matrix:
database:
- "postgres:14"
python-version:
- "3.12"

fail-fast: true

timeout-minutes: 45

steps:
- name: Display current test matrix
run: echo '${{ toJSON(matrix) }}'

- uses: actions/checkout@v4
with:
persist-credentials: false
fetch-depth: 0

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver-opts: image=moby/buildkit:v0.12.5

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
id: setup_python
with:
python-version: ${{ matrix.python-version }}

- name: UV Cache
# Manually cache the uv cache directory
# until setup-python supports it:
# https://github.com/actions/setup-python/issues/822
uses: actions/cache@v4
id: cache-uv
with:
path: ~/.cache/uv
key: uvcache-${{ runner.os }}-${{ steps.setup_python.outputs.python-version }}-${{ hashFiles('requirements-client.txt', 'requirements.txt', 'requirements-dev.txt') }}

- name: Get image tag
id: get_image_tag
run: |
SHORT_SHA=$(git rev-parse --short=7 HEAD)
tmp="sha-$SHORT_SHA-python${{ matrix.python-version }}"
echo "image_tag=${tmp}" >> $GITHUB_OUTPUT
- name: Build test image
uses: docker/build-push-action@v6
with:
context: .
build-args: |
PYTHON_VERSION=${{ matrix.python-version }}
PREFECT_EXTRAS=[dev]
tags: prefecthq/prefect-dev:${{ steps.get_image_tag.outputs.image_tag }}
outputs: type=docker,dest=/tmp/image.tar

- name: Test Docker image
run: |
docker load --input /tmp/image.tar
docker run --rm prefecthq/prefect-dev:${{ steps.get_image_tag.outputs.image_tag }} prefect version
- name: Install packages
run: |
python -m pip install -U uv
uv pip install --upgrade --system -e .[dev]
- name: Start database container
run: >
docker run
--name "postgres"
--detach
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
--publish 5432:5432
--tmpfs /var/lib/postgresql/data
--env POSTGRES_USER="prefect"
--env POSTGRES_PASSWORD="prefect"
--env POSTGRES_DB="prefect"
--env LANG="C.UTF-8"
--env LANGUAGE="C.UTF-8"
--env LC_ALL="C.UTF-8"
--env LC_COLLATE="C.UTF-8"
--env LC_CTYPE="C.UTF-8"
${{ matrix.database }}
-c max_connections=250
./scripts/wait-for-healthy-container.sh postgres 30
echo "PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://prefect:prefect@localhost/prefect" >> $GITHUB_ENV
- name: Start docker registry
run: >
docker run
--name "prefect-test-registry"
--detach
--publish 5555:5000
registry:2
- name: Start redis
run: >
docker run
--name "redis"
--detach
--publish 6379:6379
redis:latest
- name: Run tests
env:
PREFECT_EXPERIMENTAL_ENABLE_PYDANTIC_V2_INTERNALS: "1"
DD_CIVISIBILITY_AGENTLESS_ENABLED: true
DD_API_KEY: ${{ secrets.DD_API_KEY_CI_VISIBILITY }}
DD_SITE: datadoghq.com
DD_ENV: ci
DD_SERVICE: prefect
run: >
pytest tests
--numprocesses auto
--maxprocesses 6
--ddtrace
--dist worksteal
--disable-docker-image-builds
--exclude-service kubernetes
--durations 26
--cov
--cov-config setup.cfg
run-docker-tests:
runs-on:
group: oss-larger-runners
Expand Down
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
ruff
cairosvg
codespell>=2.2.6
ddtrace
ipython
jinja2
moto >= 5
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ def serve(
cron: Optional[Union[Iterable[str], str]] = None,
rrule: Optional[Union[Iterable[str], str]] = None,
paused: Optional[bool] = None,
schedules: Optional[List["FlexibleScheduleList"]] = None,
schedules: Optional["FlexibleScheduleList"] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ def handle_retry(self, exc: Exception) -> bool:
else:
delay = None
new_state = Retrying()
if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION:
self.task_run.run_count += 1

self.logger.info(
"Task run failed with exception: %r - " "Retry %s/%s will start %s",
Expand Down
48 changes: 32 additions & 16 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import datetime
import inspect
import os
from copy import copy
from functools import partial, update_wrapper
from typing import (
Expand Down Expand Up @@ -188,6 +187,31 @@ def _infer_parent_task_runs(
return parents


def _generate_task_key(fn: Callable[..., Any]) -> str:
"""Generate a task key based on the function name and source code.
We may eventually want some sort of top-level namespace here to
disambiguate tasks with the same function name in different modules,
in a more human-readable way, while avoiding relative import problems (see #12337).
As long as the task implementations are unique (even if named the same), we should
not have any collisions.
Args:
fn: The function to generate a task key for.
"""
if not hasattr(fn, "__qualname__"):
return to_qualified_name(type(fn))

qualname = fn.__qualname__.split(".")[-1]

code_hash = (
h[:NUM_CHARS_DYNAMIC_KEY] if (h := hash_objects(fn.__code__)) else "unknown"
)

return f"{qualname}-{code_hash}"


class Task(Generic[P, R]):
"""
A Prefect task definition.
Expand Down Expand Up @@ -270,7 +294,7 @@ def __init__(
description: Optional[str] = None,
tags: Optional[Iterable[str]] = None,
version: Optional[str] = None,
cache_policy: Optional[CachePolicy] = NotSet,
cache_policy: Union[CachePolicy, Type[NotSet]] = NotSet,
cache_key_fn: Optional[
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
Expand Down Expand Up @@ -369,17 +393,7 @@ def __init__(

self.tags = set(tags if tags else [])

if not hasattr(self.fn, "__qualname__"):
self.task_key = to_qualified_name(type(self.fn))
else:
try:
task_origin_hash = hash_objects(
self.name, os.path.abspath(inspect.getsourcefile(self.fn))
)
except TypeError:
task_origin_hash = "unknown-source-file"

self.task_key = f"{self.fn.__qualname__}-{task_origin_hash}"
self.task_key = _generate_task_key(self.fn)

if cache_policy is not NotSet and cache_key_fn is not None:
logger.warning(
Expand Down Expand Up @@ -1496,7 +1510,7 @@ async def serve(self) -> NoReturn:
Args:
task_runner: The task runner to use for serving the task. If not provided,
the default ConcurrentTaskRunner will be used.
the default task runner will be used.
Examples:
Serve a task using the default task runner
Expand All @@ -1523,7 +1537,7 @@ def task(
description: Optional[str] = None,
tags: Optional[Iterable[str]] = None,
version: Optional[str] = None,
cache_policy: CachePolicy = NotSet,
cache_policy: Union[CachePolicy, Type[NotSet]] = NotSet,
cache_key_fn: Optional[
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
Expand Down Expand Up @@ -1561,7 +1575,9 @@ def task(
tags: Optional[Iterable[str]] = None,
version: Optional[str] = None,
cache_policy: Union[CachePolicy, Type[NotSet]] = NotSet,
cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]] = None,
cache_key_fn: Union[
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]], None
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
retries: Optional[int] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ def happy_path():
== task_run.expected_start_time
)
assert pending.payload["task_run"].pop("estimated_start_time_delta") > 0.0
assert (
pending.payload["task_run"]
.pop("task_key")
.startswith("test_task_state_change_happy_path.<locals>.happy_little_tree")
)
assert pending.payload["task_run"].pop("task_key").startswith("happy_little_tree")
assert pending.payload == {
"initial_state": None,
"intended": {"from": None, "to": "PENDING"},
Expand Down Expand Up @@ -112,11 +108,7 @@ def happy_path():
== task_run.expected_start_time
)
assert running.payload["task_run"].pop("estimated_start_time_delta") > 0.0
assert (
running.payload["task_run"]
.pop("task_key")
.startswith("test_task_state_change_happy_path.<locals>.happy_little_tree")
)
assert running.payload["task_run"].pop("task_key").startswith("happy_little_tree")
assert running.payload == {
"intended": {"from": "PENDING", "to": "RUNNING"},
"initial_state": {
Expand Down Expand Up @@ -169,11 +161,7 @@ def happy_path():
== task_run.expected_start_time
)
assert completed.payload["task_run"].pop("estimated_start_time_delta") > 0.0
assert (
completed.payload["task_run"]
.pop("task_key")
.startswith("test_task_state_change_happy_path.<locals>.happy_little_tree")
)
assert completed.payload["task_run"].pop("task_key").startswith("happy_little_tree")
assert completed.payload["task_run"].pop("estimated_run_time") > 0.0
assert (
pendulum.parse(completed.payload["task_run"].pop("start_time"))
Expand Down Expand Up @@ -262,11 +250,7 @@ def happy_path():
== task_run.expected_start_time
)
assert pending.payload["task_run"].pop("estimated_start_time_delta") > 0.0
assert (
pending.payload["task_run"]
.pop("task_key")
.startswith("test_task_state_change_task_failure.<locals>.happy_little_tree")
)
assert pending.payload["task_run"].pop("task_key").startswith("happy_little_tree")
assert pending.payload == {
"initial_state": None,
"intended": {"from": None, "to": "PENDING"},
Expand Down Expand Up @@ -314,11 +298,7 @@ def happy_path():
== task_run.expected_start_time
)
assert running.payload["task_run"].pop("estimated_start_time_delta") > 0.0
assert (
running.payload["task_run"]
.pop("task_key")
.startswith("test_task_state_change_task_failure.<locals>.happy_little_tree")
)
assert running.payload["task_run"].pop("task_key").startswith("happy_little_tree")
assert running.payload == {
"intended": {"from": "PENDING", "to": "RUNNING"},
"initial_state": {
Expand Down Expand Up @@ -374,11 +354,7 @@ def happy_path():
== task_run.expected_start_time
)
assert failed.payload["task_run"].pop("estimated_start_time_delta") > 0.0
assert (
failed.payload["task_run"]
.pop("task_key")
.startswith("test_task_state_change_task_failure.<locals>.happy_little_tree")
)
assert failed.payload["task_run"].pop("task_key").startswith("happy_little_tree")
assert failed.payload["task_run"].pop("estimated_run_time") > 0.0
assert (
pendulum.parse(failed.payload["task_run"].pop("start_time"))
Expand Down
22 changes: 0 additions & 22 deletions tests/test_background_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import asyncio
import inspect
import os
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator, Iterable, Tuple
Expand All @@ -26,7 +24,6 @@
temporary_settings,
)
from prefect.task_worker import TaskWorker
from prefect.utilities.hashing import hash_objects

if TYPE_CHECKING:
from prefect.client.orchestration import PrefectClient
Expand Down Expand Up @@ -447,22 +444,3 @@ async def bar(x: int, mappable: Iterable) -> Tuple[int, Iterable]:
"parameters": {"x": i + 1, "mappable": ["some", "iterable"]},
"context": mock.ANY,
}


class TestTaskKey:
def test_task_key_includes_qualname_and_source_file_hash(self):
def some_fn():
pass

t = Task(fn=some_fn)
source_file = os.path.abspath(inspect.getsourcefile(some_fn))
task_origin_hash = hash_objects(t.name, source_file)
assert t.task_key == f"{some_fn.__qualname__}-{task_origin_hash}"

def test_task_key_handles_unknown_source_file(self, monkeypatch):
def some_fn():
pass

monkeypatch.setattr(inspect, "getsourcefile", lambda x: None)
t = Task(fn=some_fn)
assert t.task_key == f"{some_fn.__qualname__}-unknown-source-file"
Loading

0 comments on commit 317b078

Please sign in to comment.