Skip to content

Commit 9c6fb6b

Browse files
Ulada Zakharavapotiuk
authored andcommitted
Switch to Connexion 3 framework
This is a huge PR being result of over a 100 commits made by a number of people in ##36052 and #37638. It switches to Connexion 3 as the driving backend implementation for both - Airflow REST APIs and Flask app that powers Airflow UI. It should be largely backwards compatible when it comes to behaviour of both APIs and Airflow Webserver views, however due to decisions made by Connexion 3 maintainers, it changes heavily the technology stack used under-the-hood: 1) Connexion App is an ASGI-compatible Open-API spec-first framework using ASGI as an interface between webserver and Python web application. ASGI is an asynchronous successor of WSGI. 2) Connexion itself is using Starlette to run asynchronous web services in Python. 3) We continue using gunicorn appliation server that still uses WSGI standard, which means that we can continue using Flask and we are usig standard Uvicorn ASGI webserver that converts the ASGI interface to WSGI interface of Gunicorn Some of the problems handled in this PR There were two problem was with session handling: * the get_session_cookie - did not get the right cookie - it returned "session" string. The right fix was to change cookie_jar into cookie.jar because this is where apparently TestClient of starlette is holding the cookies (visible when you debug) * The client does not accept "set_cookie" method - it accepts passing cookies via "cookies" dictionary - this is the usual httpx client - see https://www.starlette.io/testclient/ - so we have to set cookie directly in the get method to try it out Add "flask_client_with_login" for tests that neeed flask client Some tests require functionality not available to Starlette test client as they use Flask test client specific features - for those we have an option to get flask test client instead of starlette one. Fix error handling for new connection 3 approach Error handling for Connexion 3 integration needed to be reworked. The way it behaves is much the same as it works in main: * for API errors - we get application/problem+json responses * for UI erros - we have rendered views * for redirection - we have correct location header (it's been missing) * the api error handled was not added as available middleware in the www tests It should fix all test_views_base.py tests which were failing on lack of location header for redirection. Fix wrong response is tests_view_cluster_activity The problem in the test was that Starlette Test Client opens a new connection and start new session, while flask test client uses the same database session. The test did not show data because the data was not committed and session was not closed - which also failed sqlite local tests with "database is locked" error. Fix test_extra_links The tests were failing again because the dagrun created was not committed and session not closed. This worked with flask client that used the same session accidentally but did not work with test client from Starlette. Also it caused "database locked" in sqlite / local tests. Switch to non-deprecated auth manager Fix to test_views_log.py This PR partially fixes sessions and request parameter for test_views_log. Some tests are still failing but for different reasons - to be investigated. Fix views_custom_user_views tests The problem in those tests was that the check in security manager was based on the assumption that the security manager was shared between the client and test flask application - because they were coming from the same flask app. But when we use starlette, the call goes to a new process started and the user is deleted in the database - so the shortcut of checking the security manager did not work. The change is that we are now checking if the user is deleted by calling /users/show (we need a new users READ permission for that) - this way we go to the database and check if the user was indeed deleted. Fix test_task_instance_endpoint tests There were two reasons for the test failed: * when the Job was added to task instance, the task instance was not merged in session, which means that commit did not store the added Job * some of the tests were expecting a call with specific session and they failed because session was different. Replacing the session with mock.ANY tells pytest that this parameter can be anything - we will have different session when when the call will be made with ASGI/Starlette Fix parameter validation * added default value for limit parameter across the board. Connexion 3 does not like if the parameter had no default and we had not provided one - even if our custom decorated was adding it. Adding default value and updating our decorator to treat None as `default` fixed a number of problems where limits were not passed * swapped openapi specification for /datasets/{uri} and /dataset/events. Since `{uri}` was defined first, connection matched `events` with `{uri}` and chose parameter definitions from `{uri}` not events Fix test_log_enpoint tests The problem here was that some sessions should be committed/closed but also in order to run it standalone we wanted to create log templates in the database - as it relied implcitly on log templates created by other tests. Fix test_views_dagrun, test_views_tasks and test_views_log Fixed by switching to use flask client for testing rather than starlette. Starlette client in this case has some side effects that are also impacting Sqlite's session being created in a different thread and deleted with close_all_sessions fixture. Fix test_views_dagrun Fixed by switching to use flask client for testing rather than starlette. Starlette client in this case has some side effects that are also impacting Sqlite's session being created in a different thread and deleted with close_all_sessions fixture. Co-authored-by: sudipto baral <[email protected]> Co-authored-by: satoshi-sh <[email protected]> Co-authored-by: Maksim Yermakou <[email protected]> Co-authored-by: Ulada Zakharava <[email protected]> Better API initialization including vending of API specification. The way paths are added and initialized is better (for example FAB contributes their path via new method in Auth Manager. This also add back-compatibility to FAB auth manaager to continue working on Airflow 2.9.
1 parent 67e1ce6 commit 9c6fb6b

File tree

127 files changed

+2931
-2652
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+2931
-2652
lines changed

.github/workflows/basic-tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ jobs:
148148
env:
149149
HATCH_ENV: "test"
150150
working-directory: ./clients/python
151+
- name: Compile www assets
152+
run: breeze compile-www-assets
151153
- name: "Install Airflow in editable mode with fab for webserver tests"
152154
run: pip install -e ".[fab]"
153155
- name: "Install Python client"

airflow/api_connexion/endpoints/connection_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def get_connection(*, connection_id: str, session: Session = NEW_SESSION) -> API
9191
@provide_session
9292
def get_connections(
9393
*,
94-
limit: int,
94+
limit: int | None = None,
9595
offset: int = 0,
9696
order_by: str = "id",
9797
session: Session = NEW_SESSION,

airflow/api_connexion/endpoints/dag_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def get_dag_details(
9494
@provide_session
9595
def get_dags(
9696
*,
97-
limit: int,
97+
limit: int | None = None,
9898
offset: int = 0,
9999
tags: Collection[str] | None = None,
100100
dag_id_pattern: str | None = None,

airflow/api_connexion/endpoints/dag_parsing.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
from http import HTTPStatus
2020
from typing import TYPE_CHECKING, Sequence
2121

22-
from flask import Response, current_app
22+
from connexion import NoContent
23+
from flask import current_app
2324
from itsdangerous import BadSignature, URLSafeSerializer
2425
from sqlalchemy import exc, select
2526

@@ -39,7 +40,9 @@
3940

4041
@security.requires_access_dag("PUT")
4142
@provide_session
42-
def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response:
43+
def reparse_dag_file(
44+
*, file_token: str, session: Session = NEW_SESSION
45+
) -> tuple[str | NoContent, HTTPStatus]:
4346
"""Request re-parsing a DAG file."""
4447
secret_key = current_app.config["SECRET_KEY"]
4548
auth_s = URLSafeSerializer(secret_key)
@@ -65,5 +68,5 @@ def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Resp
6568
session.commit()
6669
except exc.IntegrityError:
6770
session.rollback()
68-
return Response("Duplicate request", HTTPStatus.CREATED)
69-
return Response(status=HTTPStatus.CREATED)
71+
return "Duplicate request", HTTPStatus.CREATED
72+
return NoContent, HTTPStatus.CREATED

airflow/api_connexion/endpoints/dag_warning_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
@provide_session
4444
def get_dag_warnings(
4545
*,
46-
limit: int,
46+
limit: int | None = None,
4747
dag_id: str | None = None,
4848
warning_type: str | None = None,
4949
offset: int | None = None,

airflow/api_connexion/endpoints/dataset_endpoint.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def get_dataset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
8282
@provide_session
8383
def get_datasets(
8484
*,
85-
limit: int,
85+
limit: int | None = None,
8686
offset: int = 0,
8787
uri_pattern: str | None = None,
8888
dag_ids: str | None = None,
@@ -113,11 +113,11 @@ def get_datasets(
113113

114114

115115
@security.requires_access_dataset("GET")
116-
@provide_session
117116
@format_parameters({"limit": check_limit})
117+
@provide_session
118118
def get_dataset_events(
119119
*,
120-
limit: int,
120+
limit: int | None = None,
121121
offset: int = 0,
122122
order_by: str = "timestamp",
123123
dataset_id: int | None = None,

airflow/api_connexion/endpoints/event_log_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def get_event_logs(
6464
included_events: str | None = None,
6565
before: str | None = None,
6666
after: str | None = None,
67-
limit: int,
67+
limit: int | None = None,
6868
offset: int | None = None,
6969
order_by: str = "event_log_id",
7070
session: Session = NEW_SESSION,

airflow/api_connexion/endpoints/import_error_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) ->
7777
@provide_session
7878
def get_import_errors(
7979
*,
80-
limit: int,
80+
limit: int | None = None,
8181
offset: int | None = None,
8282
order_by: str = "import_error_id",
8383
session: Session = NEW_SESSION,

airflow/api_connexion/endpoints/log_endpoint.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ def get_log(
107107
logs = logs[0] if task_try_number is not None else logs
108108
# we must have token here, so we can safely ignore it
109109
token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment]
110-
return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs))
110+
return Response(
111+
logs_schema.dumps(LogResponseObject(continuation_token=token, content=logs)),
112+
headers={"Content-Type": "application/json"},
113+
)
111114
# text/plain. Stream
112115
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
113116

airflow/api_connexion/endpoints/pool_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def get_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIResponse:
6868
@provide_session
6969
def get_pools(
7070
*,
71-
limit: int,
71+
limit: int | None = None,
7272
order_by: str = "id",
7373
offset: int | None = None,
7474
session: Session = NEW_SESSION,

0 commit comments

Comments
 (0)