Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Switch to Connexion 3 framework #39055

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ jobs:
env:
HATCH_ENV: "test"
working-directory: ./clients/python
- name: Compile www assets
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have assets compiled here to test Python client. Right now the API uses UI from Swagger that expects Javascript to be compiled in order to make API calls.

run: breeze compile-www-assets
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Generic comment: the tests seem to pass all thests on Python 3.9 + but the Pytest tests do not exit - likely some cleanup needs to happen there. For now switching to DEfault Python only but needs to be fixed before merge.

- name: "Install Airflow in editable mode with fab for webserver tests"
run: pip install -e ".[fab]"
- name: "Install Python client"
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def get_connection(*, connection_id: str, session: Session = NEW_SESSION) -> API
@provide_session
def get_connections(
*,
limit: int,
limit: int | None = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RobbeSneyders -> we found that the new Connexion 3 implements more validation and responds with error where no default values are provided for parameters that are no defined as optional (but they are provided by decorators - see above @format_parameters decorator - it will set the default value of the parameter if not set - but Connexion 3 will respond with "bad request" if the parameter has no default value. We fixed it by adding default None and reworking the decorator to check it and replace the default value (and apply range limits) if None is set, but I just wanted you to know and ask if that is intended behaviour/right way of fixing it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't immediately think of a change that could trigger this. I tried to reproduce it with a minimal example, but it works as expected:

def insert(**kwargs):
    def decorator(f):
        async def wrapped_function():
            return await f(**kwargs)
        return wrapped_function
    return decorator


@insert(name="Igor")
async def post_greeting(name: str):
    return f"Hello {name}", 200
paths:
  /greeting:
    post:
      operationId: hello.post_greeting
      parameters:
        - name: name
          in: query
          schema:
            type: string

Do you have a stack trace by any chance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to reduce the complexity of this PR, could such simple bulk changes separated-out to s different PR and merged beforehand? Or would this influence/break the old Connexion setup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that, yes @jscheffl eventually, yes but I would like to avoid "polluting" current code without knowing how we are proceeding with the whole PR. For me this is more of POC for comments and deciding what's next rather than something we want to actively start merging now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RobbeSneyders: yes:

Example stack trace here (from https://github.com/apache/airflow/actions/runs/8581603105/job/23518857077 for example).

ERROR    connexion.middleware.exceptions:exceptions.py:97 TypeError("get_dataset_events() missing 1 required keyword-only argument: 'limit'")
  Traceback (most recent call last):
    File "/usr/local/lib/python3.8/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      await app(scope, receive, sender)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/swagger_ui.py", line 222, in __call__
      await self.router(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 756, in __call__
      await self.middleware_stack(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 776, in app
      await route.handle(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 485, in handle
      await self.app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 756, in __call__
      await self.middleware_stack(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 806, in app
      await self.default(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/swagger_ui.py", line 235, in default_fn
      await self.app(original_scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/middleware/cors.py", line 85, in __call__
      await self.app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/middleware/cors.py", line 85, in __call__
      await self.app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/routing.py", line 154, in __call__
      await self.router(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 756, in __call__
      await self.middleware_stack(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 776, in app
      await route.handle(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 485, in handle
      await self.app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 756, in __call__
      await self.middleware_stack(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 776, in app
      await route.handle(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 297, in handle
      await self.app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/routing.py", line 48, in __call__
      await self.next_app(original_scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/abstract.py", line 264, in __call__
      return await operation(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/security.py", line 106, in __call__
      await self.next_app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/abstract.py", line 264, in __call__
      return await operation(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/request_validation.py", line 142, in __call__
      await self.next_app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/abstract.py", line 264, in __call__
      return await operation(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/lifespan.py", line 26, in __call__
      await self.next_app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/abstract.py", line 264, in __call__
      return await operation(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/middleware/context.py", line 25, in __call__
      await self.next_app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/connexion/apps/flask.py", line 151, in __call__
      return await self.asgi_app(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/a2wsgi/wsgi.py", line 165, in __call__
      return await responder(scope, receive, send)
    File "/usr/local/lib/python3.8/site-packages/a2wsgi/wsgi.py", line 200, in __call__
      await self.loop.run_in_executor(
    File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
      result = self.fn(*self.args, **self.kwargs)
    File "/usr/local/lib/python3.8/site-packages/a2wsgi/wsgi.py", line 256, in wsgi
      iterable = self.app(environ, start_response)
    File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2532, in wsgi_app
      response = self.handle_exception(e)
    File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2529, in wsgi_app
      response = self.full_dispatch_request()
    File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1825, in full_dispatch_request
      rv = self.handle_user_exception(e)
    File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1823, in full_dispatch_request
      rv = self.dispatch_request()
    File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1799, in dispatch_request
      return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
    File "/usr/local/lib/python3.8/site-packages/connexion/apps/flask.py", line 68, in __call__
      return self.fn(*args, **kwargs)
    File "/usr/local/lib/python3.8/site-packages/connexion/decorators/main.py", line 134, in wrapper
      return decorated_function(request)
    File "/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 171, in wrapper
      handler_response = function(*args, **kwargs)
    File "/usr/local/lib/python3.8/site-packages/connexion/decorators/parameter.py", line 87, in wrapper
      return function(**kwargs)
    File "/usr/local/lib/python3.8/site-packages/connexion/decorators/main.py", line 123, in wrapper
      return function(*args, **kwargs)
    File "/opt/airflow/airflow/api_connexion/security.py", line 182, in decorated
      return _requires_access(
    File "/opt/airflow/airflow/api_connexion/security.py", line 92, in _requires_access
      return func(*args, **kwargs)
    File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
      return func(*args, session=session, **kwargs)
    File "/opt/airflow/airflow/api_connexion/parameters.py", line 104, in wrapped_function
      return func(*args, **kwargs)
  TypeError: get_dataset_events() missing 1 required keyword-only argument: 'limit'

offset: int = 0,
order_by: str = "id",
session: Session = NEW_SESSION,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_dag_details(
@provide_session
def get_dags(
*,
limit: int,
limit: int | None = None,
offset: int = 0,
tags: Collection[str] | None = None,
dag_id_pattern: str | None = None,
Expand Down
11 changes: 7 additions & 4 deletions airflow/api_connexion/endpoints/dag_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from http import HTTPStatus
from typing import TYPE_CHECKING, Sequence

from flask import Response, current_app
from connexion import NoContent
from flask import current_app
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy import exc, select

Expand All @@ -39,7 +40,9 @@

@security.requires_access_dag("PUT")
@provide_session
def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response:
def reparse_dag_file(
*, file_token: str, session: Session = NEW_SESSION
) -> tuple[str | NoContent, HTTPStatus]:
"""Request re-parsing a DAG file."""
secret_key = current_app.config["SECRET_KEY"]
auth_s = URLSafeSerializer(secret_key)
Expand All @@ -65,5 +68,5 @@ def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Resp
session.commit()
except exc.IntegrityError:
session.rollback()
return Response("Duplicate request", HTTPStatus.CREATED)
return Response(status=HTTPStatus.CREATED)
return "Duplicate request", HTTPStatus.CREATED
return NoContent, HTTPStatus.CREATED
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/dag_warning_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
@provide_session
def get_dag_warnings(
*,
limit: int,
limit: int | None = None,
dag_id: str | None = None,
warning_type: str | None = None,
offset: int | None = None,
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_dataset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
@provide_session
def get_datasets(
*,
limit: int,
limit: int | None = None,
offset: int = 0,
uri_pattern: str | None = None,
dag_ids: str | None = None,
Expand Down Expand Up @@ -113,11 +113,11 @@ def get_datasets(


@security.requires_access_dataset("GET")
@provide_session
@format_parameters({"limit": check_limit})
@provide_session
def get_dataset_events(
*,
limit: int,
limit: int | None = None,
offset: int = 0,
order_by: str = "timestamp",
dataset_id: int | None = None,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_event_logs(
included_events: str | None = None,
before: str | None = None,
after: str | None = None,
limit: int,
limit: int | None = None,
offset: int | None = None,
order_by: str = "event_log_id",
session: Session = NEW_SESSION,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/import_error_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) ->
@provide_session
def get_import_errors(
*,
limit: int,
limit: int | None = None,
offset: int | None = None,
order_by: str = "import_error_id",
session: Session = NEW_SESSION,
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_connexion/endpoints/log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ def get_log(
logs = logs[0] if task_try_number is not None else logs
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment]
return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs))
return Response(
logs_schema.dumps(LogResponseObject(continuation_token=token, content=logs)),
headers={"Content-Type": "application/json"},
)
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/pool_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIResponse:
@provide_session
def get_pools(
*,
limit: int,
limit: int | None = None,
order_by: str = "id",
offset: int | None = None,
session: Session = NEW_SESSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _apply_range_filter(query: Select, key: ClauseElement, value_range: tuple[T,
@provide_session
def get_task_instances(
*,
limit: int,
limit: int | None = None,
dag_id: str | None = None,
dag_run_id: str | None = None,
execution_date_gte: str | None = None,
Expand Down
55 changes: 23 additions & 32 deletions airflow/api_connexion/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
from http import HTTPStatus
from typing import TYPE_CHECKING, Any

import werkzeug
from connexion import FlaskApi, ProblemException, problem
from connexion import ProblemException, problem

from airflow.utils.docs import get_docs_url

if TYPE_CHECKING:
import flask
from connexion.lifecycle import ConnexionRequest, ConnexionResponse

doc_link = get_docs_url("stable-rest-api-ref.html")

Expand All @@ -40,37 +39,29 @@
}


def common_error_handler(exception: BaseException) -> flask.Response:
def problem_error_handler(_request: ConnexionRequest, exception: ProblemException) -> ConnexionResponse:
"""Use to capture connexion exceptions and add link to the type field."""
if isinstance(exception, ProblemException):
link = EXCEPTIONS_LINK_MAP.get(exception.status)
if link:
response = problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=link,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)
else:
response = problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=exception.type,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)
link = EXCEPTIONS_LINK_MAP.get(exception.status)
if link:
return problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=link,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)
else:
if not isinstance(exception, werkzeug.exceptions.HTTPException):
exception = werkzeug.exceptions.InternalServerError()

response = problem(title=exception.name, detail=exception.description, status=exception.code)

return FlaskApi.get_response(response)
return problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=exception.type,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)


class NotFound(ProblemException):
Expand Down
61 changes: 35 additions & 26 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,10 @@ paths:
responses:
"204":
description: Success.
content:
text/html:
schema:
type: string
"400":
$ref: "#/components/responses/BadRequest"
"401":
Expand Down Expand Up @@ -1831,6 +1835,10 @@ paths:
responses:
"204":
description: Success.
content:
text/html:
schema:
type: string
"400":
$ref: "#/components/responses/BadRequest"
"401":
Expand Down Expand Up @@ -1973,8 +1981,8 @@ paths:
response = self.client.get(
request_url,
query_string={"token": token},
headers={"Accept": "text/plain"},
environ_overrides={"REMOTE_USER": "test"},
headers={"Accept": "text/plain","REMOTE_USER": "test"},

)
continuation_token = response.json["continuation_token"]
metadata = URLSafeSerializer(key).loads(continuation_token)
Expand Down Expand Up @@ -2108,7 +2116,7 @@ paths:
properties:
content:
type: string
plain/text:
text/plain:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
schema:
type: string

Expand Down Expand Up @@ -2194,29 +2202,6 @@ paths:
"403":
$ref: "#/components/responses/PermissionDenied"

/datasets/{uri}:
parameters:
- $ref: "#/components/parameters/DatasetURI"
get:
summary: Get a dataset
description: Get a dataset by uri.
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: get_dataset
tags: [Dataset]
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/Dataset"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/datasets/events:
get:
summary: Get dataset events
Expand Down Expand Up @@ -2274,6 +2259,30 @@ paths:
'404':
$ref: '#/components/responses/NotFound'

/datasets/{uri}:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
parameters:
- $ref: "#/components/parameters/DatasetURI"
get:
summary: Get a dataset
description: Get a dataset by uri.
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: get_dataset
tags: [Dataset]
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/Dataset"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"


/config:
get:
summary: Get current configuration
Expand Down
14 changes: 9 additions & 5 deletions airflow/api_connexion/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def validate_istimezone(value: datetime) -> None:
raise BadRequest("Invalid datetime format", detail="Naive datetime is disallowed")


def format_datetime(value: str) -> datetime:
def format_datetime(value: str | None) -> datetime | None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needdd to convert the decorators to handle None as default value - see https://github.com/apache/airflow/pull/39055/files#r1566990719

"""
Format datetime objects.

Expand All @@ -50,6 +50,8 @@ def format_datetime(value: str) -> datetime:

This should only be used within connection views because it raises 400
"""
if value is None:
return None
value = value.strip()
if value[-1] != "Z":
value = value.replace(" ", "+")
Expand All @@ -59,7 +61,7 @@ def format_datetime(value: str) -> datetime:
raise BadRequest("Incorrect datetime argument", detail=str(err))


def check_limit(value: int) -> int:
def check_limit(value: int | None) -> int:
"""
Check the limit does not exceed configured value.

Expand All @@ -68,7 +70,8 @@ def check_limit(value: int) -> int:
"""
max_val = conf.getint("api", "maximum_page_limit") # user configured max page limit
fallback = conf.getint("api", "fallback_page_limit")

if value is None:
return fallback
if value > max_val:
log.warning(
"The limit param value %s passed in API exceeds the configured maximum page limit %s",
Expand Down Expand Up @@ -99,8 +102,9 @@ def format_parameters_decorator(func: T) -> T:
@wraps(func)
def wrapped_function(*args, **kwargs):
for key, formatter in params_formatters.items():
if key in kwargs:
kwargs[key] = formatter(kwargs[key])
value = formatter(kwargs.get(key))
if value:
kwargs[key] = value
return func(*args, **kwargs)

return cast(T, wrapped_function)
Expand Down
13 changes: 11 additions & 2 deletions airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from abc import abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Container, Literal, Sequence
from typing import TYPE_CHECKING, Any, Container, Literal, Sequence

from flask_appbuilder.menu import MenuItem
from sqlalchemy import select
Expand Down Expand Up @@ -82,7 +82,7 @@ def get_cli_commands() -> list[CLICommand]:
return []

def get_api_endpoints(self) -> None | Blueprint:
"""Return API endpoint(s) definition for the auth manager."""
"""Return API endpoint(s) definition for the auth manager for Airflow 2.9."""
return None

def get_user_name(self) -> str:
Expand Down Expand Up @@ -442,3 +442,12 @@ def security_manager(self) -> AirflowSecurityManagerV2:
from airflow.www.security_manager import AirflowSecurityManagerV2

return AirflowSecurityManagerV2(self.appbuilder)

def get_auth_manager_api_specification(self) -> tuple[str | None, dict[Any, Any]]:
"""
Return the mount point and specification (openapi) for auth manager contributed API (Airflow 2.10).

By default is raises NotImplementedError which produces a warning in airflow logs when auth manager is
initialized, but you can return None, {} if the auth manager does not contribute API.
"""
raise NotImplementedError
Loading
Loading