diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py index 7df7dd91908a7..2d0c86a0ce1af 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py @@ -353,8 +353,9 @@ def enqueue_connection_test( else: effective_team = test_body.team_name + auth_method = "PUT" if existing is not None and test_body.commit_on_success else "POST" if not get_auth_manager().is_authorized_connection( - method="POST", + method=auth_method, details=ConnectionDetails(conn_id=test_body.connection_id, team_name=effective_team), user=user, ): diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py index 4dd16d3b30bad..6930039e1abbc 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py @@ -25,6 +25,7 @@ from sqlalchemy import func, select from sqlalchemy.orm import Session +from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager from airflow.api_fastapi.core_api.datamodels.common import BulkActionResponse, BulkBody from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody from airflow.api_fastapi.core_api.services.public.connections import BulkConnectionService @@ -1302,6 +1303,27 @@ def test_post_stores_commit_on_success(self, test_client, session): assert ct is not None assert ct.commit_on_success is True + @mock.patch("airflow.api_fastapi.core_api.routes.public.connections.get_auth_manager", autospec=True) + @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"}) + def test_post_commit_on_success_existing_connection_requires_edit_permission( + self, mock_get_auth_manager, test_client, session + ): + """POST /connections/enqueue-test requires edit permission when it can update a connection.""" + self.create_connection() + mock_auth_manager = mock.create_autospec(BaseAuthManager, instance=True) + mock_get_auth_manager.return_value = mock_auth_manager + mock_auth_manager.is_authorized_connection.side_effect = lambda *, method, details, user: ( + method == "POST" + ) + body = {**self.TEST_REQUEST_BODY, "commit_on_success": True} + + response = test_client.post("/connections/enqueue-test", json=body) + + assert response.status_code == 403 + mock_auth_manager.is_authorized_connection.assert_called_once() + assert mock_auth_manager.is_authorized_connection.call_args.kwargs["method"] == "PUT" + assert session.scalar(select(ConnectionTestRequest)) is None + @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"}) def test_post_returns_409_for_duplicate_active_test(self, test_client, session): """POST returns 409 when there's already an active test for the same connection_id."""