diff --git a/src/auth0_server_python/auth_schemes/__init__.py b/src/auth0_server_python/auth_schemes/__init__.py index 1c2c869..ef37613 100644 --- a/src/auth0_server_python/auth_schemes/__init__.py +++ b/src/auth0_server_python/auth_schemes/__init__.py @@ -1,3 +1,4 @@ from .bearer_auth import BearerAuth +from .dpop_auth import DPoPAuth -__all__ = ["BearerAuth"] +__all__ = ["BearerAuth", "DPoPAuth"] diff --git a/src/auth0_server_python/auth_schemes/dpop_auth.py b/src/auth0_server_python/auth_schemes/dpop_auth.py new file mode 100644 index 0000000..a0e0a19 --- /dev/null +++ b/src/auth0_server_python/auth_schemes/dpop_auth.py @@ -0,0 +1,88 @@ +import base64 +import hashlib +import time +import uuid + +import httpx +from jwcrypto import jwk +from jwcrypto import jwt as jwcrypto_jwt + + +def _base64url(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") + + +def make_dpop_proof_for_token_endpoint(key: "jwk.JWK", method: str, url: str, nonce: str = None) -> str: + """ + Build a DPoP proof JWT for use at the token endpoint (RFC 9449 §4.2). + Unlike resource-server proofs, token-endpoint proofs do NOT include `ath` + because no access token exists yet at issuance time. + """ + public_jwk = key.export_public(as_dict=True) + htu = url.split("?")[0].split("#")[0] + header = {"typ": "dpop+jwt", "alg": "ES256", "jwk": public_jwk} + payload = { + "jti": str(uuid.uuid4()), + "htm": method.upper(), + "htu": htu, + "iat": int(time.time()), + } + if nonce is not None: + payload["nonce"] = nonce + token = jwcrypto_jwt.JWT(header=header, claims=payload) + token.make_signed_token(key) + return token.serialize() + + +class DPoPAuth(httpx.Auth): + def __init__(self, token: str, key: "jwk.JWK") -> None: + public_jwk = key.export_public(as_dict=True) + if public_jwk.get("kty") != "EC" or public_jwk.get("crv") != "P-256": + raise ValueError("DPoP key must be an EC P-256 key") + try: + token.encode("ascii") + except UnicodeEncodeError: + raise ValueError("Access token must contain only ASCII characters") + self._token = token + self._key = key + self._public_jwk = public_jwk + + def __repr__(self) -> str: + return "DPoPAuth(token=[REDACTED], key=[REDACTED])" + + def __str__(self) -> str: + return "DPoPAuth(token=[REDACTED], key=[REDACTED])" + + def auth_flow(self, request: httpx.Request): + proof = self._make_proof(request.method, str(request.url)) + request.headers["Authorization"] = f"DPoP {self._token}" + request.headers["DPoP"] = proof + response = yield request + + # RFC 9449 §8.2 — server-nonce retry + if ( + response.status_code == 401 + and response.headers.get("DPoP-Nonce") + ): + nonce = response.headers["DPoP-Nonce"] + request.headers["DPoP"] = self._make_proof(request.method, str(request.url), nonce=nonce) + yield request + + def _make_proof(self, method: str, url: str, nonce: str = None) -> str: + htu = url.split("?")[0].split("#")[0] + ath = _base64url(hashlib.sha256(self._token.encode("ascii")).digest()) + + header = {"typ": "dpop+jwt", "alg": "ES256", "jwk": self._public_jwk} + payload = { + "jti": str(uuid.uuid4()), + "htm": method.upper(), + "htu": htu, + "iat": int(time.time()), + "ath": ath, + } + if nonce is not None: + payload["nonce"] = nonce + + token = jwcrypto_jwt.JWT(header=header, claims=payload) + token.make_signed_token(self._key) + return token.serialize() diff --git a/src/auth0_server_python/auth_server/my_account_client.py b/src/auth0_server_python/auth_server/my_account_client.py index 499b981..5e10b60 100644 --- a/src/auth0_server_python/auth_server/my_account_client.py +++ b/src/auth0_server_python/auth_server/my_account_client.py @@ -1,16 +1,25 @@ - -from typing import Optional +import json +from typing import TYPE_CHECKING, Optional +from urllib.parse import quote, unquote, urlparse import httpx from auth0_server_python.auth_schemes.bearer_auth import BearerAuth +from auth0_server_python.auth_schemes.dpop_auth import DPoPAuth from auth0_server_python.auth_types import ( + AuthenticationMethod, CompleteConnectAccountRequest, CompleteConnectAccountResponse, ConnectAccountRequest, ConnectAccountResponse, + EnrollAuthenticationMethodRequest, + EnrollmentChallengeResponse, + GetFactorsResponse, + ListAuthenticationMethodsResponse, ListConnectedAccountConnectionsResponse, ListConnectedAccountsResponse, + UpdateAuthenticationMethodRequest, + VerifyAuthenticationMethodRequest, ) from auth0_server_python.error import ( ApiError, @@ -19,6 +28,18 @@ MyAccountApiError, ) +if TYPE_CHECKING: + from jwcrypto import jwk + + +def _make_auth( + access_token: str, + dpop_key: Optional["jwk.JWK"] = None, +) -> httpx.Auth: + if dpop_key is not None: + return DPoPAuth(access_token, dpop_key) + return BearerAuth(access_token) + class MyAccountClient: """ @@ -340,3 +361,397 @@ async def list_connected_account_connections( f"Connected Accounts list connections request failed: {str(e) or 'Unknown error'}", e ) + + # ============================================================================ + # AUTHENTICATION METHODS & FACTORS (Passkey / MyAccount API) + # ============================================================================ + + async def get_factors( + self, + access_token: str, + dpop_key: Optional["jwk.JWK"] = None, + ) -> GetFactorsResponse: + """ + Retrieve the list of factors available for enrollment. + + Args: + access_token: User's access token (scope: read:me:factors). + dpop_key: Optional EC P-256 key for DPoP-bound token presentation. + + Returns: + GetFactorsResponse containing the available factors. + + Raises: + MissingRequiredArgumentError: If access_token is not provided. + MyAccountApiError: If the API returns an error response. + ApiError: If the request fails due to network or other issues. + """ + if not access_token: + raise MissingRequiredArgumentError("access_token") + + try: + async with self._get_http_client() as client: + response = await client.get( + url=f"{self.audience}v1/factors", + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "get_factors_error", + f"Get factors failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + return GetFactorsResponse.model_validate(response.json()) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "get_factors_error", + "Get factors request failed", + e, + ) + + async def list_authentication_methods( + self, + access_token: str, + type_filter: Optional[str] = None, + dpop_key: Optional["jwk.JWK"] = None, + ) -> ListAuthenticationMethodsResponse: + if not access_token: + raise MissingRequiredArgumentError("access_token") + + try: + async with self._get_http_client() as client: + params = {} + if type_filter: + params["type"] = type_filter + + response = await client.get( + url=f"{self.audience}v1/authentication-methods", + params=params, + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "list_authentication_methods_error", + f"List authentication methods failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + return ListAuthenticationMethodsResponse.model_validate(response.json()) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "list_authentication_methods_error", + "List authentication methods request failed", + e, + ) + + async def get_authentication_method( + self, + access_token: str, + authentication_method_id: str, + dpop_key: Optional["jwk.JWK"] = None, + ) -> AuthenticationMethod: + if not access_token: + raise MissingRequiredArgumentError("access_token") + if not authentication_method_id: + raise MissingRequiredArgumentError("authentication_method_id") + + try: + async with self._get_http_client() as client: + response = await client.get( + url=f"{self.audience}v1/authentication-methods/{quote(authentication_method_id, safe='')}", + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "get_authentication_method_error", + f"Get authentication method failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + return AuthenticationMethod.model_validate(response.json()) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "get_authentication_method_error", + "Get authentication method request failed", + e, + ) + + async def delete_authentication_method( + self, + access_token: str, + authentication_method_id: str, + dpop_key: Optional["jwk.JWK"] = None, + ) -> None: + if not access_token: + raise MissingRequiredArgumentError("access_token") + if not authentication_method_id: + raise MissingRequiredArgumentError("authentication_method_id") + + try: + async with self._get_http_client() as client: + response = await client.delete( + url=f"{self.audience}v1/authentication-methods/{quote(authentication_method_id, safe='')}", + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 204: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "delete_authentication_method_error", + f"Delete authentication method failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "delete_authentication_method_error", + "Delete authentication method request failed", + e, + ) + + async def update_authentication_method( + self, + access_token: str, + authentication_method_id: str, + request: UpdateAuthenticationMethodRequest, + dpop_key: Optional["jwk.JWK"] = None, + ) -> AuthenticationMethod: + if not access_token: + raise MissingRequiredArgumentError("access_token") + if not authentication_method_id: + raise MissingRequiredArgumentError("authentication_method_id") + if request is None: + raise MissingRequiredArgumentError("request") + + try: + async with self._get_http_client() as client: + response = await client.patch( + url=f"{self.audience}v1/authentication-methods/{quote(authentication_method_id, safe='')}", + json=request.model_dump(exclude_none=True), + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "update_authentication_method_error", + f"Update authentication method failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + return AuthenticationMethod.model_validate(response.json()) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "update_authentication_method_error", + "Update authentication method request failed", + e, + ) + + async def enroll_authentication_method( + self, + access_token: str, + request: EnrollAuthenticationMethodRequest, + dpop_key: Optional["jwk.JWK"] = None, + ) -> EnrollmentChallengeResponse: + """Step 1 of 2: Start enrollment (POST /me/v1/authentication-methods). + + For passkey enrollment, pass the returned authn_params_public_key to + navigator.credentials.create(), then call verify_authentication_method() + with the auth_session and credential result. + + Requires scope: create:me:authentication-methods + """ + if not access_token: + raise MissingRequiredArgumentError("access_token") + if request is None: + raise MissingRequiredArgumentError("request") + + try: + async with self._get_http_client() as client: + response = await client.post( + url=f"{self.audience}v1/authentication-methods", + json=request.model_dump(exclude_none=True), + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 202: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "enroll_authentication_method_error", + f"Enroll authentication method failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + location = response.headers.get("location") + if not location: + raise ApiError( + "enroll_authentication_method_error", + "Enrollment succeeded (202) but Location header is missing", + ) + + parsed_path = urlparse(location).path.rstrip("/") + raw_id = parsed_path.rsplit("/", 1)[-1] if "/" in parsed_path else "" + authentication_method_id = unquote(raw_id) + if not authentication_method_id or authentication_method_id in ( + "authentication-methods", + "v1", + "me", + ): + raise ApiError( + "enroll_authentication_method_error", + "Enrollment succeeded (202) but could not extract ID from Location header", + ) + + try: + data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "enroll_authentication_method_error", + "Enrollment succeeded (202) but response body is not valid JSON", + ) + + auth_session = data.get("auth_session") + if not auth_session: + raise ApiError( + "enroll_authentication_method_error", + "Enrollment succeeded (202) but auth_session is missing from response", + ) + + return EnrollmentChallengeResponse.model_validate( + { + **data, + "authentication_method_id": authentication_method_id, + "auth_session": auth_session, + } + ) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "enroll_authentication_method_error", + "Enroll authentication method request failed", + e, + ) + + async def verify_authentication_method( + self, + access_token: str, + authentication_method_id: str, + request: VerifyAuthenticationMethodRequest, + dpop_key: Optional["jwk.JWK"] = None, + ) -> AuthenticationMethod: + """Step 2 of 2: Verify enrollment (POST /me/v1/authentication-methods/{id}/verify). + + Requires scope: create:me:authentication-methods + """ + if not access_token: + raise MissingRequiredArgumentError("access_token") + if not authentication_method_id: + raise MissingRequiredArgumentError("authentication_method_id") + if request is None: + raise MissingRequiredArgumentError("request") + + try: + async with self._get_http_client() as client: + response = await client.post( + url=f"{self.audience}v1/authentication-methods/{quote(authentication_method_id, safe='')}/verify", + json=request.model_dump(by_alias=True, exclude_none=True), + auth=_make_auth(access_token, dpop_key), + ) + + if response.status_code != 201: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise ApiError( + "verify_authentication_method_error", + f"Verify authentication method failed with status {response.status_code}", + ) + raise MyAccountApiError( + title=error_data.get("title", None), + type=error_data.get("type", None), + detail=error_data.get("detail", None), + status=error_data.get("status", None), + validation_errors=error_data.get("validation_errors", None), + ) + + return AuthenticationMethod.model_validate(response.json()) + + except Exception as e: + if isinstance(e, (MyAccountApiError, ApiError)): + raise + raise ApiError( + "verify_authentication_method_error", + "Verify authentication method request failed", + e, + ) diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 91de45d..22f6d21 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -7,7 +7,10 @@ import json import time from collections import OrderedDict -from typing import Any, Callable, Generic, Optional, TypeVar, Union +from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar, Union + +if TYPE_CHECKING: + from jwcrypto import jwk from urllib.parse import parse_qs, urlencode, urlparse, urlunparse import httpx @@ -16,6 +19,7 @@ from authlib.integrations.httpx_client import AsyncOAuth2Client from pydantic import ValidationError +from auth0_server_python.auth_schemes.dpop_auth import make_dpop_proof_for_token_endpoint from auth0_server_python.auth_server.mfa_client import MfaClient from auth0_server_python.auth_server.my_account_client import MyAccountClient from auth0_server_python.auth_types import ( @@ -31,6 +35,12 @@ LogoutOptions, LogoutTokenClaims, MfaRequirements, + PasskeyAuthResponse, + PasskeyLoginChallengeResponse, + PasskeyLoginResult, + PasskeySignupChallengeResponse, + PasskeyTokenResponse, + PasskeyUserProfile, StartInteractiveLoginOptions, StateData, TokenExchangeResponse, @@ -54,6 +64,8 @@ MfaRequiredError, MissingRequiredArgumentError, MissingTransactionError, + PasskeyError, + PasskeyErrorCode, PollingApiError, StartLinkUserError, ) @@ -78,6 +90,9 @@ class ServerClient(Generic[TStoreOptions]): and token operations using Authlib for OIDC functionality. """ DEFAULT_AUDIENCE_STATE_KEY = "default" + GRANT_TYPE_PASSKEY = "urn:okta:params:oauth:grant-type:webauthn" + PASSKEY_REGISTER_PATH = "/passkey/register" + PASSKEY_CHALLENGE_PATH = "/passkey/challenge" # ============================================================================ # INITIALIZATION @@ -2483,3 +2498,334 @@ async def login_with_custom_token_exchange( def mfa(self) -> MfaClient: """Access the MFA client for multi-factor authentication operations.""" return self._mfa_client + + # ============================================================================ + # PASSKEY AUTHENTICATION + # ============================================================================ + + async def passkey_signup_challenge( + self, + user_profile: Optional[PasskeyUserProfile] = None, + connection: Optional[str] = None, + organization: Optional[str] = None, + user_metadata: Optional[dict[str, Any]] = None, + store_options: Optional[dict[str, Any]] = None, + ) -> PasskeySignupChallengeResponse: + """ + Step 1 of 2: Initiate a passkey signup challenge (POST /passkey/register). + + Pass the returned authn_params_public_key to navigator.credentials.create(), + then call signin_with_passkey() with the auth_session and credential result. + + Args: + user_profile: Optional user profile data (email, name, username, etc.). + Use PasskeyUserProfile — supports extra fields for forward compatibility. + connection: Auth0 database connection name (realm). + organization: Auth0 organization ID or name. + user_metadata: Optional custom metadata added at the root of the request body, + not nested inside user_profile (per Auth0 API spec). + store_options: Optional options for domain resolution. + + Returns: + PasskeySignupChallengeResponse with auth_session and authn_params_public_key. + + Raises: + PasskeyError: If the challenge request fails. + """ + try: + domain = await self._resolve_current_domain(store_options) + + body: dict[str, Any] = {"client_id": self._client_id} + if self._client_secret: + body["client_secret"] = self._client_secret + if user_profile: + body["user_profile"] = user_profile.model_dump(exclude_none=True) + if user_metadata: + body["user_metadata"] = user_metadata + if connection: + body["realm"] = connection + if organization: + body["organization"] = organization + + async with self._get_http_client() as client: + url = f"https://{domain}{self.PASSKEY_REGISTER_PATH}" + response = await client.post(url, json=body) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise PasskeyError( + PasskeyErrorCode.CHALLENGE_FAILED, + f"Passkey signup challenge failed with status {response.status_code}", + ) + raise PasskeyError( + error_data.get("error", PasskeyErrorCode.CHALLENGE_FAILED), + error_data.get("error_description", "Passkey signup challenge failed"), + ) + + try: + data = response.json() + except (json.JSONDecodeError, ValueError): + raise PasskeyError( + PasskeyErrorCode.INVALID_RESPONSE, + "Failed to parse passkey signup challenge response as JSON", + ) + + return PasskeySignupChallengeResponse.model_validate(data) + + except Exception as e: + if isinstance(e, (PasskeyError, MissingRequiredArgumentError, ValidationError)): + raise + raise PasskeyError(PasskeyErrorCode.CHALLENGE_FAILED, "Passkey signup challenge failed", e) from e + + async def passkey_login_challenge( + self, + username: Optional[str] = None, + connection: Optional[str] = None, + organization: Optional[str] = None, + store_options: Optional[dict[str, Any]] = None, + ) -> PasskeyLoginChallengeResponse: + """ + Step 1 of 2: Initiate a passkey login challenge (POST /passkey/challenge). + + Pass the returned authn_params_public_key to navigator.credentials.get(), + then call signin_with_passkey() with the auth_session and credential result. + + Args: + username: Optional username hint for conditional UI. + connection: Auth0 database connection name (realm). + organization: Auth0 organization ID or name. + store_options: Optional options for domain resolution. + + Returns: + PasskeyLoginChallengeResponse with auth_session and authn_params_public_key. + + Raises: + ApiError: If the challenge request fails. + """ + try: + domain = await self._resolve_current_domain(store_options) + + body: dict[str, Any] = {"client_id": self._client_id} + if self._client_secret: + body["client_secret"] = self._client_secret + if username: + body["username"] = username + if connection: + body["realm"] = connection + if organization: + body["organization"] = organization + + async with self._get_http_client() as client: + url = f"https://{domain}{self.PASSKEY_CHALLENGE_PATH}" + response = await client.post(url, json=body) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise PasskeyError( + PasskeyErrorCode.CHALLENGE_FAILED, + f"Passkey login challenge failed with status {response.status_code}", + ) + raise PasskeyError( + error_data.get("error", PasskeyErrorCode.CHALLENGE_FAILED), + error_data.get("error_description", "Passkey login challenge failed"), + ) + + try: + data = response.json() + except (json.JSONDecodeError, ValueError): + raise PasskeyError( + PasskeyErrorCode.INVALID_RESPONSE, + "Failed to parse passkey login challenge response as JSON", + ) + + return PasskeyLoginChallengeResponse.model_validate(data) + + except Exception as e: + if isinstance(e, (PasskeyError, MissingRequiredArgumentError, ValidationError)): + raise + raise PasskeyError(PasskeyErrorCode.CHALLENGE_FAILED, "Passkey login challenge failed", e) from e + + async def signin_with_passkey( + self, + auth_session: str, + authn_response: PasskeyAuthResponse, + store_options: Optional[dict[str, Any]] = None, + connection: Optional[str] = None, + organization: Optional[str] = None, + scope: Optional[str] = None, + audience: Optional[str] = None, + dpop_key: Optional["jwk.JWK"] = None, + ) -> PasskeyLoginResult: + """ + Completes passkey authentication by exchanging the WebAuthn assertion + for tokens and establishing a server-side session. + + This is step 2 of 2: call passkey_signup_challenge or passkey_login_challenge + first to obtain auth_session and the WebAuthn challenge options. + + Uses Content-Type: application/json (required for nested authn_response). + Persists the session to the state store (same as complete_interactive_login). + + Args: + auth_session: Session credential from passkey_signup_challenge or passkey_login_challenge. + authn_response: Serialized WebAuthn credential from navigator.credentials.create/get. + store_options: Options passed to the state store (e.g., request/response for cookies). + Passed through to the store on every call. + connection: Auth0 database connection name (realm). + organization: Auth0 organization ID or name. + scope: OAuth2 scope string. + audience: Target API audience. + dpop_key: Optional EC P-256 JWK for DPoP-bound token exchange. When provided, + attaches a DPoP proof header so Auth0 issues a DPoP-bound token + (token_type: DPoP). Required when the tenant mandates DPoP binding. + + Returns: + PasskeyLoginResult containing state_data with user claims and token sets, + consistent with complete_interactive_login and login_with_custom_token_exchange. + + Raises: + MissingRequiredArgumentError: If auth_session or authn_response is missing. + PasskeyError: If token exchange or session creation fails. + """ + if not auth_session: + raise MissingRequiredArgumentError("auth_session") + if authn_response is None: + raise MissingRequiredArgumentError("authn_response") + + try: + domain = await self._resolve_current_domain(store_options) + metadata = await self._get_oidc_metadata_cached(domain) + + token_endpoint = metadata.get("token_endpoint") + if not token_endpoint: + raise PasskeyError(PasskeyErrorCode.TOKEN_EXCHANGE_FAILED, "Token endpoint missing in OIDC metadata") + + body: dict[str, Any] = { + "grant_type": self.GRANT_TYPE_PASSKEY, + "client_id": self._client_id, + "auth_session": auth_session, + "authn_response": authn_response.model_dump(by_alias=True, exclude_none=True), + } + if self._client_secret: + body["client_secret"] = self._client_secret + if connection: + body["realm"] = connection + if organization: + body["organization"] = organization + if scope: + body["scope"] = scope + if audience: + body["audience"] = audience + + async with self._get_http_client() as client: + headers = {} + if dpop_key is not None: + headers["DPoP"] = make_dpop_proof_for_token_endpoint( + dpop_key, "POST", token_endpoint + ) + response = await client.post(token_endpoint, json=body, headers=headers) + + # RFC 9449 §8.2 — nonce retry for DPoP token endpoint calls + if ( + dpop_key is not None + and response.status_code == 401 + and response.headers.get("DPoP-Nonce") + ): + nonce = response.headers["DPoP-Nonce"] + headers["DPoP"] = make_dpop_proof_for_token_endpoint( + dpop_key, "POST", token_endpoint, nonce=nonce + ) + response = await client.post(token_endpoint, json=body, headers=headers) + + if response.status_code != 200: + try: + error_data = response.json() + except (json.JSONDecodeError, ValueError): + raise PasskeyError( + PasskeyErrorCode.TOKEN_EXCHANGE_FAILED, + f"Passkey token exchange failed with status {response.status_code}", + ) + raise PasskeyError( + error_data.get("error", PasskeyErrorCode.TOKEN_EXCHANGE_FAILED), + error_data.get("error_description", "Passkey token exchange failed"), + ) + + try: + token_data = response.json() + except (json.JSONDecodeError, ValueError): + raise PasskeyError( + PasskeyErrorCode.INVALID_RESPONSE, "Failed to parse passkey token response as JSON" + ) + + if "expires_in" in token_data and "expires_at" not in token_data: + token_data["expires_at"] = int(time.time()) + token_data["expires_in"] + + token_response = PasskeyTokenResponse.model_validate(token_data) + + if dpop_key is not None and token_response.token_type.lower() != "dpop": + raise PasskeyError( + PasskeyErrorCode.TOKEN_EXCHANGE_FAILED, + f"DPoP token binding failed: expected token_type 'DPoP', " + f"got '{token_response.token_type}'", + ) + + # Extract user claims from ID token if present + user_claims = None + sid = PKCE.generate_random_string(32) + if token_response.id_token: + jwks = await self._get_jwks_cached(domain, metadata) + try: + claims = await self._verify_and_decode_jwt( + token_response.id_token, jwks, audience=self._client_id + ) + origin_issuer = metadata.get("issuer") + if not origin_issuer: + raise IssuerValidationError( + "Issuer missing from OIDC metadata. Cannot validate ID token issuer." + ) + token_issuer = claims.get("iss", "") + if self._normalize_url(token_issuer) != self._normalize_url(origin_issuer): + raise IssuerValidationError( + "ID token issuer mismatch. Ensure your Auth0 domain is configured correctly." + ) + user_claims = UserClaims.model_validate(claims) + sid = claims.get("sid", sid) + except ValueError as e: + raise ApiError("jwks_key_not_found", str(e)) + except jwt.InvalidSignatureError as e: + raise ApiError("invalid_signature", f"ID token signature verification failed: {str(e)}", e) + except jwt.InvalidAudienceError as e: + raise ApiError("invalid_audience", f"ID token audience mismatch: {str(e)}", e) + except jwt.ExpiredSignatureError as e: + raise ApiError("token_expired", f"ID token has expired: {str(e)}", e) + except jwt.InvalidTokenError as e: + raise ApiError("invalid_token", f"ID token verification failed: {str(e)}", e) + + # Build token set and session state + token_set = TokenSet( + audience=audience or self.DEFAULT_AUDIENCE_STATE_KEY, + access_token=token_response.access_token, + scope=token_response.scope or scope or "", + expires_at=token_response.expires_at if token_response.expires_at is not None else int(time.time()) + token_response.expires_in, + ) + state_data = StateData( + user=user_claims, + id_token=token_response.id_token, + refresh_token=token_response.refresh_token, + token_sets=[token_set], + domain=domain, + internal={"sid": sid, "created_at": int(time.time())}, + ) + + await self._state_store.set(self._state_identifier, state_data, options=store_options) + + return PasskeyLoginResult(state_data=state_data.model_dump()) + + except Exception as e: + if isinstance(e, (PasskeyError, MissingRequiredArgumentError, ValidationError, ApiError, IssuerValidationError)): + raise + raise PasskeyError(PasskeyErrorCode.TOKEN_EXCHANGE_FAILED, "Passkey sign-in failed", e) from e diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 055103a..44ec918 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -5,7 +5,7 @@ from typing import Any, Literal, Optional, Union -from pydantic import BaseModel, Field, field_validator, model_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator class UserClaims(BaseModel): @@ -13,6 +13,7 @@ class UserClaims(BaseModel): User profile information as returned by Auth0. Contains standard OIDC claims about the authenticated user. """ + sub: str name: Optional[str] = None nickname: Optional[str] = None @@ -32,6 +33,7 @@ class TokenSet(BaseModel): Represents a set of tokens issued by Auth0. Contains the access token and related metadata. """ + audience: str access_token: str scope: Optional[str] = None @@ -43,6 +45,7 @@ class ConnectionTokenSet(TokenSet): Token set specific to a connection. Extends TokenSet with connection-specific information. """ + connection: str login_hint: str @@ -52,6 +55,7 @@ class InternalStateData(BaseModel): Internal data used for managing state. Not meant to be accessed directly by SDK users. """ + sid: str created_at: int @@ -61,6 +65,7 @@ class SessionData(BaseModel): Represents a user session with Auth0. Contains user information and tokens. """ + user: Optional[UserClaims] = None id_token: Optional[str] = None refresh_token: Optional[str] = None @@ -77,6 +82,7 @@ class StateData(SessionData): Complete state data stored in the state store. Extends SessionData with internal management information. """ + internal: InternalStateData @@ -85,6 +91,7 @@ class TransactionData(BaseModel): Represents data for an in-progress authentication transaction. Used during the authorization code flow to correlate requests. """ + audience: Optional[str] = None code_verifier: str app_state: Optional[Any] = None @@ -101,6 +108,7 @@ class LogoutTokenClaims(BaseModel): Claims expected in a logout token. Used for backchannel logout processing. """ + sub: str sid: str iss: Optional[str] = None @@ -111,6 +119,7 @@ class EncryptedStoreOptions(BaseModel): Options for encrypted stores. Contains the secret used for encryption. """ + secret: str @@ -119,6 +128,7 @@ class ServerClientOptionsBase(BaseModel): Base options for configuring the Auth0 server client. Contains core settings required for all clients. """ + domain: str client_id: str client_secret: str @@ -135,6 +145,7 @@ class ServerClientOptionsWithSecret(ServerClientOptionsBase): Client options using a secret for encryption. Extends base options with secret and duration settings. """ + secret: str state_absolute_duration: Optional[int] = 259200 # 3 days in seconds @@ -144,6 +155,7 @@ class StartInteractiveLoginOptions(BaseModel): Options for starting the interactive login process. Configures how the authorization request is constructed. """ + pushed_authorization_requests: Optional[bool] = False app_state: Optional[Any] = None authorization_params: Optional[dict[str, Any]] = None @@ -154,6 +166,7 @@ class LogoutOptions(BaseModel): Options for logout operations. Configures how the logout request is constructed. """ + return_to: Optional[str] = None @@ -162,6 +175,7 @@ class AuthorizationParameters(BaseModel): Parameters used in authorization requests. Based on standard OAuth2/OIDC parameters. """ + scope: Optional[str] = None audience: Optional[str] = None redirect_uri: Optional[str] = None @@ -169,11 +183,13 @@ class AuthorizationParameters(BaseModel): class Config: extra = "allow" # Allow additional OAuth parameters + class AuthorizationDetails(BaseModel): """ Authorization details returned from Auth0. Used for Resource Access Rights (RAR). """ + type: str actions: Optional[list[str]] = None locations: Optional[list[str]] = None @@ -188,6 +204,7 @@ class LoginBackchannelOptions(BaseModel): """ Options for Client-Initiated Backchannel Authentication. """ + binding_message: str login_hint: dict[str, str] # Should contain a 'sub' field authorization_params: Optional[dict[str, Any]] = None @@ -200,6 +217,7 @@ class LoginBackchannelResult(BaseModel): """ Result from Client-Initiated Backchannel Authentication. """ + authorization_details: Optional[list[AuthorizationDetails]] = None @@ -207,19 +225,23 @@ class AccessTokenForConnectionOptions(BaseModel): """ Options for retrieving an access token for a specific connection. """ + connection: str login_hint: Optional[str] = None + class StartLinkUserOptions(BaseModel): connection: str connection_scope: Optional[str] = None authorization_params: Optional[dict[str, Any]] = None app_state: Optional[Any] = None + # ============================================================================= # Multiple Custom Domain # ============================================================================= + class DomainResolverContext(BaseModel): """ Context passed to domain resolver function for MCD support. @@ -236,13 +258,16 @@ async def domain_resolver(context: DomainResolverContext) -> str: host = context.request_headers.get('host', '').split(':')[0] return DOMAIN_MAP.get(host, DEFAULT_DOMAIN) """ + request_url: Optional[str] = None request_headers: Optional[dict[str, str]] = None + # ============================================================================= # Custom Token Exchange Types # ============================================================================= + class CustomTokenExchangeOptions(BaseModel): """ Options for custom token exchange (RFC 8693). @@ -257,6 +282,7 @@ class CustomTokenExchangeOptions(BaseModel): organization: Organization identifier for the token exchange (optional) authorization_params: Additional OAuth parameters (optional) """ + subject_token: str = Field(..., min_length=1) subject_token_type: str = Field(..., min_length=1) audience: Optional[str] = None @@ -266,7 +292,7 @@ class CustomTokenExchangeOptions(BaseModel): organization: Optional[str] = None authorization_params: Optional[dict[str, Any]] = None - @field_validator('subject_token', 'actor_token') + @field_validator("subject_token", "actor_token") @classmethod def validate_token_format(cls, v: Optional[str]) -> Optional[str]: """Validate token doesn't have Bearer prefix and isn't whitespace-only.""" @@ -277,8 +303,8 @@ def validate_token_format(cls, v: Optional[str]) -> Optional[str]: raise ValueError("Token should not include 'Bearer ' prefix") return v - @model_validator(mode='after') - def validate_actor_token_type(self) -> 'CustomTokenExchangeOptions': + @model_validator(mode="after") + def validate_actor_token_type(self) -> "CustomTokenExchangeOptions": """Ensure actor_token_type is provided if actor_token is present.""" if self.actor_token and not self.actor_token_type: raise ValueError("actor_token_type is required when actor_token is provided") @@ -298,6 +324,7 @@ class TokenExchangeResponse(BaseModel): id_token: OpenID Connect ID token (optional) refresh_token: Refresh token (optional) """ + access_token: str token_type: str = "Bearer" expires_in: int @@ -313,6 +340,7 @@ class LoginWithCustomTokenExchangeOptions(BaseModel): Combines token exchange parameters with session management. """ + subject_token: str = Field(..., min_length=1) subject_token_type: str = Field(..., min_length=1) audience: Optional[str] = None @@ -322,7 +350,7 @@ class LoginWithCustomTokenExchangeOptions(BaseModel): organization: Optional[str] = None authorization_params: Optional[dict[str, Any]] = None - @field_validator('subject_token', 'actor_token') + @field_validator("subject_token", "actor_token") @classmethod def validate_token_format(cls, v: Optional[str]) -> Optional[str]: """Validate token doesn't have Bearer prefix and isn't whitespace-only.""" @@ -333,8 +361,8 @@ def validate_token_format(cls, v: Optional[str]) -> Optional[str]: raise ValueError("Token should not include 'Bearer ' prefix") return v - @model_validator(mode='after') - def validate_actor_token_type(self) -> 'LoginWithCustomTokenExchangeOptions': + @model_validator(mode="after") + def validate_actor_token_type(self) -> "LoginWithCustomTokenExchangeOptions": """Ensure actor_token_type is provided if actor_token is present.""" if self.actor_token and not self.actor_token_type: raise ValueError("actor_token_type is required when actor_token is provided") @@ -347,13 +375,28 @@ class LoginWithCustomTokenExchangeResult(BaseModel): Contains session data established after token exchange. """ + state_data: dict[str, Any] authorization_details: Optional[list[AuthorizationDetails]] = None + +class PasskeyLoginResult(BaseModel): + """ + Result from signin_with_passkey. + + Contains the session data established after the webauthn token exchange. + Mirrors LoginWithCustomTokenExchangeResult — passkey sign-in is a complete + login ceremony and creates a server-side session like every other login path. + """ + + state_data: dict[str, Any] + + # ============================================================================= # Connected Accounts Types # ============================================================================= + # BASE & SHARED class ConnectedAccountBase(BaseModel): id: str @@ -363,6 +406,7 @@ class ConnectedAccountBase(BaseModel): created_at: str expires_at: Optional[str] = None + # ENTITIES (What exists) class ConnectedAccount(ConnectedAccountBase): id: str @@ -381,6 +425,7 @@ class ConnectedAccountConnection(BaseModel): # Connect Operations (How to connect) + class ConnectAccountOptions(BaseModel): connection: str redirect_uri: Optional[str] = None @@ -388,38 +433,45 @@ class ConnectAccountOptions(BaseModel): app_state: Optional[Any] = None authorization_params: Optional[dict[str, Any]] = None + class ConnectAccountRequest(BaseModel): connection: str scopes: Optional[list[str]] = None redirect_uri: Optional[str] = None state: Optional[str] = None code_challenge: Optional[str] = None - code_challenge_method: Optional[str] = 'S256' + code_challenge_method: Optional[str] = "S256" authorization_params: Optional[dict[str, Any]] = None + class ConnectParams(BaseModel): ticket: str + class ConnectAccountResponse(BaseModel): auth_session: str connect_uri: str connect_params: ConnectParams expires_in: int + class CompleteConnectAccountRequest(BaseModel): auth_session: str connect_code: str redirect_uri: str code_verifier: Optional[str] = None + class CompleteConnectAccountResponse(ConnectedAccountBase): app_state: Optional[Any] = None + # Manage operations class ListConnectedAccountsResponse(BaseModel): accounts: list[ConnectedAccount] next: Optional[str] = None + class ListConnectedAccountConnectionsResponse(BaseModel): connections: list[ConnectedAccountConnection] next: Optional[str] = None @@ -437,6 +489,7 @@ class ListConnectedAccountConnectionsResponse(BaseModel): class AuthenticatorResponse(BaseModel): """Represents an MFA authenticator enrolled by a user.""" + id: str authenticator_type: AuthenticatorType active: bool @@ -450,14 +503,17 @@ class AuthenticatorResponse(BaseModel): # Enrollment Options + class EnrollOtpOptions(BaseModel): """Options for enrolling an OTP authenticator.""" + authenticator_types: list[str] mfa_token: str class EnrollOobOptions(BaseModel): """Options for enrolling an OOB authenticator (SMS, Voice, Push).""" + authenticator_types: list[str] oob_channels: list[OobChannel] phone_number: Optional[str] = None @@ -466,6 +522,7 @@ class EnrollOobOptions(BaseModel): class EnrollEmailOptions(BaseModel): """Options for enrolling an email authenticator.""" + authenticator_types: list[str] oob_channels: list[OobChannel] email: Optional[str] = None @@ -477,8 +534,10 @@ class EnrollEmailOptions(BaseModel): # Enrollment Responses + class OtpEnrollmentResponse(BaseModel): """Response when enrolling an OTP authenticator.""" + authenticator_type: Literal["otp"] secret: str barcode_uri: str @@ -488,6 +547,7 @@ class OtpEnrollmentResponse(BaseModel): class OobEnrollmentResponse(BaseModel): """Response when enrolling an OOB authenticator.""" + authenticator_type: Literal["oob"] oob_channel: OobChannel oob_code: Optional[str] = None @@ -502,8 +562,10 @@ class OobEnrollmentResponse(BaseModel): # Challenge Types + class ChallengeOptions(BaseModel): """Options for initiating an MFA challenge.""" + challenge_type: ChallengeType authenticator_id: Optional[str] = None mfa_token: str @@ -511,6 +573,7 @@ class ChallengeOptions(BaseModel): class ChallengeResponse(BaseModel): """Response from initiating an MFA challenge.""" + challenge_type: ChallengeType oob_code: Optional[str] = None binding_method: Optional[str] = None @@ -519,21 +582,26 @@ class ChallengeResponse(BaseModel): # List Options + class ListAuthenticatorsOptions(BaseModel): """Options for listing MFA authenticators.""" + mfa_token: str # Verify Types + class VerifyOtpOptions(BaseModel): """Verify with OTP code.""" + mfa_token: str otp: str class VerifyOobOptions(BaseModel): """Verify with OOB code + binding code.""" + mfa_token: str oob_code: str binding_code: str @@ -541,6 +609,7 @@ class VerifyOobOptions(BaseModel): class VerifyRecoveryCodeOptions(BaseModel): """Verify with recovery code.""" + mfa_token: str recovery_code: str @@ -550,6 +619,7 @@ class VerifyRecoveryCodeOptions(BaseModel): class MfaVerifyResponse(BaseModel): """Response from MFA verification.""" + access_token: str token_type: str = "Bearer" expires_in: int @@ -562,24 +632,218 @@ class MfaVerifyResponse(BaseModel): # MFA Requirements + class MfaRequirement(BaseModel): """A single MFA requirement entry.""" + type: str class MfaRequirements(BaseModel): """MFA requirements from an mfa_required error response.""" + enroll: Optional[list[MfaRequirement]] = None challenge: Optional[list[MfaRequirement]] = None # MFA Token Context (for encrypted storage) + class MfaTokenContext(BaseModel): """Internal context stored inside encrypted mfa_token.""" + mfa_token: str audience: str scope: str mfa_requirements: Optional[MfaRequirements] = None created_at: int + +# ============================================================================= +# Passkey & MyAccount Authentication Methods Types +# ============================================================================= + + +class PasskeyRpInfo(BaseModel): + id: str + name: str + + +class PasskeyUserInfo(BaseModel): + model_config = ConfigDict(populate_by_name=True) + id: str + name: str + display_name: Optional[str] = Field(None, alias="displayName") + + +class PasskeyPubKeyCredParam(BaseModel): + type: str + alg: int + + +class PasskeyAuthenticatorSelection(BaseModel): + model_config = ConfigDict(populate_by_name=True) + resident_key: Optional[str] = Field(None, alias="residentKey") + user_verification: Optional[str] = Field(None, alias="userVerification") + + +class PasskeyPublicKeyOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, extra="allow") + challenge: str + rp: Optional[PasskeyRpInfo] = None + rp_id: Optional[str] = Field(None, alias="rpId") + user: Optional[PasskeyUserInfo] = None + pub_key_cred_params: Optional[list[PasskeyPubKeyCredParam]] = Field( + None, alias="pubKeyCredParams" + ) + authenticator_selection: Optional[PasskeyAuthenticatorSelection] = Field( + None, alias="authenticatorSelection" + ) + timeout: Optional[int] = None + user_verification: Optional[str] = Field(None, alias="userVerification") + + +EnrollmentType = Literal["passkey", "email", "phone", "totp", "push-notification", "recovery-code", "password"] +PreferredAuthMethod = Literal["sms", "voice"] + + +class EnrollAuthenticationMethodRequest(BaseModel): + type: EnrollmentType + email: Optional[str] = None + phone_number: Optional[str] = None + preferred_authentication_method: Optional[PreferredAuthMethod] = None + user_identity_id: Optional[str] = None + connection: Optional[str] = None + + +class EnrollmentChallengeResponse(BaseModel): + model_config = ConfigDict(extra="allow") + authentication_method_id: str + auth_session: str + authn_params_public_key: Optional[PasskeyPublicKeyOptions] = None + + def __repr__(self) -> str: + return ( + f"EnrollmentChallengeResponse(" + f"authentication_method_id={self.authentication_method_id!r}, " + f"auth_session=[REDACTED], " + f"authn_params_public_key={self.authn_params_public_key!r})" + ) + + +class PasskeyAuthResponse(BaseModel): + model_config = ConfigDict(populate_by_name=True) + id: str + raw_id: str = Field(alias="rawId") + type: str + authenticator_attachment: Optional[str] = Field(None, alias="authenticatorAttachment") + response: dict[str, str] + client_extension_results: Optional[dict[str, Any]] = Field(None, alias="clientExtensionResults") + + +class VerifyAuthenticationMethodRequest(BaseModel): + auth_session: str + authn_response: Optional[PasskeyAuthResponse] = None + otp_code: Optional[str] = None + recovery_code: Optional[str] = None + password: Optional[str] = None + + +class AuthenticationMethod(BaseModel): + model_config = ConfigDict(extra="allow") + + id: str + type: str + created_at: str + confirmed: Optional[bool] = None + usage: Optional[list[str]] = None + identity_user_id: Optional[str] = None + credential_device_type: Optional[str] = None + credential_backed_up: Optional[bool] = None + key_id: Optional[str] = None + public_key: Optional[str] = None + transports: Optional[list[str]] = None + user_agent: Optional[str] = None + user_handle: Optional[str] = None + aaguid: Optional[str] = None + relying_party_id: Optional[str] = None + phone_number: Optional[str] = None + preferred_authentication_method: Optional[str] = None + email: Optional[str] = None + name: Optional[str] = None + last_password_reset: Optional[str] = None + + +class UpdateAuthenticationMethodRequest(BaseModel): + name: Optional[str] = None + preferred_authentication_method: Optional[str] = None + + +class ListAuthenticationMethodsResponse(BaseModel): + authentication_methods: list[AuthenticationMethod] + + +class Factor(BaseModel): + model_config = ConfigDict(extra="allow") + name: str + enabled: Optional[bool] = None + trial_expired: Optional[bool] = None + + +class GetFactorsResponse(BaseModel): + factors: list[Factor] + + +class PasskeyUserProfile(BaseModel): + model_config = ConfigDict(extra="allow") + email: Optional[str] = None + name: Optional[str] = None + username: Optional[str] = None + phone_number: Optional[str] = None + given_name: Optional[str] = None + family_name: Optional[str] = None + nickname: Optional[str] = None + picture: Optional[str] = None + + +class _PasskeyChallengeResponseBase(BaseModel): + auth_session: str + authn_params_public_key: PasskeyPublicKeyOptions + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}(" + f"auth_session=[REDACTED], " + f"authn_params_public_key={self.authn_params_public_key!r})" + ) + + +class PasskeySignupChallengeResponse(_PasskeyChallengeResponseBase): + pass + + +class PasskeyLoginChallengeResponse(_PasskeyChallengeResponseBase): + pass + + +class PasskeyTokenResponse(BaseModel): + model_config = ConfigDict(extra="allow") + access_token: str + token_type: str = "Bearer" + expires_in: int + expires_at: Optional[int] = None + scope: Optional[str] = None + id_token: Optional[str] = None + refresh_token: Optional[str] = None + + def __repr__(self) -> str: + return ( + f"PasskeyTokenResponse(" + f"token_type={self.token_type!r}, " + f"expires_in={self.expires_in!r}, " + f"expires_at={self.expires_at!r}, " + f"scope={self.scope!r}, " + f"access_token=[REDACTED], " + f"id_token=[REDACTED], " + f"refresh_token=[REDACTED])" + ) diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index db4f28e..615c112 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -229,6 +229,24 @@ class CustomTokenExchangeErrorCode: INVALID_RESPONSE = "invalid_response" +class PasskeyError(Auth0Error): + """ + Error raised during passkey authentication operations. + """ + def __init__(self, code: str, message: str, cause=None): + super().__init__(message) + self.code = code + self.name = "PasskeyError" + self.cause = cause + + +class PasskeyErrorCode: + """Error codes for passkey operations.""" + CHALLENGE_FAILED = "passkey_challenge_error" + TOKEN_EXCHANGE_FAILED = "passkey_token_error" + INVALID_RESPONSE = "invalid_response" + + # ============================================================================= # MFA Error Classes # ============================================================================= diff --git a/src/auth0_server_python/tests/test_dpop_auth.py b/src/auth0_server_python/tests/test_dpop_auth.py new file mode 100644 index 0000000..b6beb69 --- /dev/null +++ b/src/auth0_server_python/tests/test_dpop_auth.py @@ -0,0 +1,145 @@ +import base64 +import hashlib +import json + +import httpx +import pytest +from jwcrypto import jwk + +from auth0_server_python.auth_schemes.bearer_auth import BearerAuth +from auth0_server_python.auth_schemes.dpop_auth import DPoPAuth, _base64url +from auth0_server_python.auth_server.my_account_client import _make_auth + + +@pytest.fixture +def ec_key(): + return jwk.JWK.generate(kty="EC", crv="P-256") + + +def _decode_jwt_parts(token: str) -> tuple[dict, dict]: + parts = token.split(".") + header = json.loads(base64.urlsafe_b64decode(parts[0] + "==")) + payload = json.loads(base64.urlsafe_b64decode(parts[1] + "==")) + return header, payload + + +def test_dpop_headers_set(ec_key): + auth = DPoPAuth(token="test_token", key=ec_key) + request = httpx.Request("POST", "https://example.com/me/v1/authentication-methods") + flow = auth.auth_flow(request) + modified = next(flow) + + assert modified.headers["Authorization"] == "DPoP test_token" + assert "DPoP" in modified.headers + assert "Bearer" not in modified.headers["Authorization"] + + +def test_dpop_proof_structure(ec_key): + auth = DPoPAuth(token="test_token", key=ec_key) + request = httpx.Request("POST", "https://example.com/me/v1/authentication-methods") + flow = auth.auth_flow(request) + modified = next(flow) + + proof = modified.headers["DPoP"] + header, payload = _decode_jwt_parts(proof) + + assert header["typ"] == "dpop+jwt" + assert header["alg"] == "ES256" + assert "jwk" in header + assert header["jwk"]["kty"] == "EC" + assert header["jwk"]["crv"] == "P-256" + + assert "jti" in payload + assert payload["htm"] == "POST" + assert payload["htu"] == "https://example.com/me/v1/authentication-methods" + assert "iat" in payload + assert "ath" in payload + + +def test_dpop_htm_binding(ec_key): + auth = DPoPAuth(token="test_token", key=ec_key) + + get_request = httpx.Request("GET", "https://example.com/me/v1/factors") + flow = auth.auth_flow(get_request) + modified = next(flow) + _, payload = _decode_jwt_parts(modified.headers["DPoP"]) + assert payload["htm"] == "GET" + + post_request = httpx.Request("post", "https://example.com/me/v1/factors") + flow = auth.auth_flow(post_request) + modified = next(flow) + _, payload = _decode_jwt_parts(modified.headers["DPoP"]) + assert payload["htm"] == "POST" + + +def test_dpop_htu_strips_query_and_fragment(ec_key): + auth = DPoPAuth(token="test_token", key=ec_key) + request = httpx.Request("GET", "https://example.com/me/v1/factors?foo=bar#section") + flow = auth.auth_flow(request) + modified = next(flow) + _, payload = _decode_jwt_parts(modified.headers["DPoP"]) + assert payload["htu"] == "https://example.com/me/v1/factors" + + +def test_dpop_htu_preserves_port(ec_key): + auth = DPoPAuth(token="test_token", key=ec_key) + request = httpx.Request("GET", "https://example.com:8443/me/v1/factors") + flow = auth.auth_flow(request) + modified = next(flow) + _, payload = _decode_jwt_parts(modified.headers["DPoP"]) + assert payload["htu"] == "https://example.com:8443/me/v1/factors" + + +def test_dpop_ath_binding(ec_key): + token = "my_access_token_value" + auth = DPoPAuth(token=token, key=ec_key) + request = httpx.Request("GET", "https://example.com/me/v1/factors") + flow = auth.auth_flow(request) + modified = next(flow) + _, payload = _decode_jwt_parts(modified.headers["DPoP"]) + + expected_ath = _base64url(hashlib.sha256(token.encode("ascii")).digest()) + assert payload["ath"] == expected_ath + + +def test_dpop_proof_uniqueness(ec_key): + auth = DPoPAuth(token="test_token", key=ec_key) + jtis = set() + for _ in range(10): + request = httpx.Request("GET", "https://example.com/me/v1/factors") + flow = auth.auth_flow(request) + modified = next(flow) + _, payload = _decode_jwt_parts(modified.headers["DPoP"]) + jtis.add(payload["jti"]) + + assert len(jtis) == 10 + + +def test_dpop_repr_redacts_credentials(ec_key): + auth = DPoPAuth(token="secret_access_token_value", key=ec_key) + assert "secret_access_token_value" not in repr(auth) + assert "secret_access_token_value" not in str(auth) + assert "[REDACTED]" in repr(auth) + assert "[REDACTED]" in str(auth) + + +def test_dpop_rejects_non_ec_key(): + rsa_key = jwk.JWK.generate(kty="RSA", size=2048) + with pytest.raises(ValueError, match="EC P-256"): + DPoPAuth(token="token", key=rsa_key) + + +def test_dpop_rejects_wrong_curve(): + p384_key = jwk.JWK.generate(kty="EC", crv="P-384") + with pytest.raises(ValueError, match="EC P-256"): + DPoPAuth(token="token", key=p384_key) + + +def test_make_auth_bearer_fallback(): + auth = _make_auth("token123", dpop_key=None) + assert isinstance(auth, BearerAuth) + + +def test_make_auth_dpop_when_key_provided(ec_key): + auth = _make_auth("token123", dpop_key=ec_key) + assert isinstance(auth, DPoPAuth) diff --git a/src/auth0_server_python/tests/test_my_account_client.py b/src/auth0_server_python/tests/test_my_account_client.py index e4ff74c..6f254c1 100644 --- a/src/auth0_server_python/tests/test_my_account_client.py +++ b/src/auth0_server_python/tests/test_my_account_client.py @@ -1,9 +1,13 @@ from unittest.mock import ANY, AsyncMock, MagicMock +import httpx import pytest +from jwcrypto import jwk as jwk_module +from auth0_server_python.auth_schemes.dpop_auth import DPoPAuth from auth0_server_python.auth_server.my_account_client import MyAccountClient from auth0_server_python.auth_types import ( + AuthenticationMethod, CompleteConnectAccountRequest, CompleteConnectAccountResponse, ConnectAccountRequest, @@ -11,10 +15,18 @@ ConnectedAccount, ConnectedAccountConnection, ConnectParams, + EnrollAuthenticationMethodRequest, + EnrollmentChallengeResponse, + GetFactorsResponse, + ListAuthenticationMethodsResponse, ListConnectedAccountConnectionsResponse, ListConnectedAccountsResponse, + PasskeyAuthResponse, + UpdateAuthenticationMethodRequest, + VerifyAuthenticationMethodRequest, ) from auth0_server_python.error import ( + ApiError, InvalidArgumentError, MissingRequiredArgumentError, MyAccountApiError, @@ -502,3 +514,899 @@ async def test_list_connected_account_connections_api_response_failure(mocker): mock_get.assert_awaited_once() assert "Invalid Token" in str(exc.value) + +# ============================================================================= +# AUTHENTICATION METHODS & FACTORS (Passkey / MyAccount API) +# ============================================================================= + + +@pytest.mark.asyncio +async def test_get_factors_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={"factors": [{"name": "sms", "enabled": True}]}) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.get_factors(access_token="token123") + + assert isinstance(result, GetFactorsResponse) + assert len(result.factors) == 1 + assert result.factors[0].name == "sms" + assert result.factors[0].enabled is True + + +@pytest.mark.asyncio +@pytest.mark.parametrize("access_token", [None, ""]) +async def test_get_factors_missing_access_token(mocker, access_token): + client = MyAccountClient(domain="auth0.local") + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock) + + with pytest.raises(MissingRequiredArgumentError): + await client.get_factors(access_token=access_token) + + mock_get.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_get_factors_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 403 + response.json = MagicMock(return_value={ + "title": "Forbidden", + "type": "forbidden", + "detail": "Insufficient scope", + "status": 403, + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MyAccountApiError) as exc: + await client.get_factors(access_token="token123") + + assert exc.value.status == 403 + + +@pytest.mark.asyncio +async def test_get_factors_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch( + "httpx.AsyncClient.get", new_callable=AsyncMock, side_effect=Exception("Connection refused") + ) + + with pytest.raises(ApiError): + await client.get_factors(access_token="token123") + + +@pytest.mark.asyncio +async def test_get_factors_empty_list(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={"factors": []}) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.get_factors(access_token="token123") + assert result.factors == [] + + +@pytest.mark.asyncio +async def test_get_factors_extra_fields(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "factors": [{"name": "webauthn-roaming", "enabled": True, "future_field": "value"}] + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.get_factors(access_token="token123") + assert result.factors[0].name == "webauthn-roaming" + + +@pytest.mark.asyncio +async def test_list_authentication_methods_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "authentication_methods": [ + {"id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z", "key_id": "kid1"} + ] + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.list_authentication_methods(access_token="token123") + assert isinstance(result, ListAuthenticationMethodsResponse) + assert len(result.authentication_methods) == 1 + assert result.authentication_methods[0].type == "passkey" + + +@pytest.mark.asyncio +async def test_list_authentication_methods_with_type_filter(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={"authentication_methods": []}) + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + await client.list_authentication_methods(access_token="token123", type_filter="passkey") + mock_get.assert_awaited_once() + call_kwargs = mock_get.call_args[1] + assert call_kwargs["params"] == {"type": "passkey"} + + +@pytest.mark.asyncio +async def test_list_authentication_methods_empty(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={"authentication_methods": []}) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.list_authentication_methods(access_token="token123") + assert result.authentication_methods == [] + + +@pytest.mark.asyncio +async def test_get_authentication_method_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z" + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + result = await client.get_authentication_method( + access_token="token123", authentication_method_id="am_1" + ) + assert isinstance(result, AuthenticationMethod) + assert result.id == "am_1" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("method_id", [None, ""]) +async def test_get_authentication_method_missing_id(mocker, method_id): + client = MyAccountClient(domain="auth0.local") + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock) + + with pytest.raises(MissingRequiredArgumentError): + await client.get_authentication_method( + access_token="token123", authentication_method_id=method_id + ) + + mock_get.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_get_authentication_method_path_traversal(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "id": "id/slash", "type": "passkey", "created_at": "2026-01-01T00:00:00Z" + }) + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + await client.get_authentication_method( + access_token="token123", authentication_method_id="id/slash" + ) + call_url = mock_get.call_args[1]["url"] + assert "id%2Fslash" in call_url + assert "id/slash" not in call_url.replace("https://auth0.local/me/", "") + + +@pytest.mark.asyncio +async def test_get_authentication_method_pipe_encoding(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "id": "passkey|new", "type": "passkey", "created_at": "2026-01-01T00:00:00Z" + }) + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + await client.get_authentication_method( + access_token="token123", authentication_method_id="passkey|new" + ) + call_url = mock_get.call_args[1]["url"] + assert "passkey%7Cnew" in call_url + + +@pytest.mark.asyncio +async def test_delete_authentication_method_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 204 + mocker.patch("httpx.AsyncClient.delete", new_callable=AsyncMock, return_value=response) + + result = await client.delete_authentication_method( + access_token="token123", authentication_method_id="am_1" + ) + assert result is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize("method_id", [None, ""]) +async def test_delete_authentication_method_missing_id(mocker, method_id): + client = MyAccountClient(domain="auth0.local") + mock_delete = mocker.patch("httpx.AsyncClient.delete", new_callable=AsyncMock) + + with pytest.raises(MissingRequiredArgumentError): + await client.delete_authentication_method( + access_token="token123", authentication_method_id=method_id + ) + + mock_delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_update_authentication_method_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z", "name": "My Key", + }) + mock_patch = mocker.patch( + "httpx.AsyncClient.patch", new_callable=AsyncMock, return_value=response + ) + + req = UpdateAuthenticationMethodRequest(name="My Key") + result = await client.update_authentication_method( + access_token="token123", authentication_method_id="am_1", request=req + ) + assert result.name == "My Key" + call_kwargs = mock_patch.call_args[1] + assert call_kwargs["json"] == {"name": "My Key"} + + +@pytest.mark.asyncio +async def test_update_authentication_method_missing_request(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch("httpx.AsyncClient.patch", new_callable=AsyncMock) + + with pytest.raises(MissingRequiredArgumentError): + await client.update_authentication_method( + access_token="token123", authentication_method_id="am_1", request=None + ) + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/passkey|new"} + response.json = MagicMock(return_value={ + "auth_session": "session_abc", + "authn_params_public_key": { + "challenge": "dGVzdA", + "rp": {"id": "auth0.local", "name": "My App"}, + "user": {"id": "dXNlcl8x", "name": "user@test.com", "displayName": "Test User"}, + "pubKeyCredParams": [{"type": "public-key", "alg": -7}], + "authenticatorSelection": {"residentKey": "required", "userVerification": "preferred"}, + "timeout": 60000, + }, + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + result = await client.enroll_authentication_method(access_token="token123", request=req) + + assert isinstance(result, EnrollmentChallengeResponse) + assert result.authentication_method_id == "passkey|new" + assert result.auth_session == "session_abc" + assert result.authn_params_public_key is not None + assert result.authn_params_public_key.pub_key_cred_params[0].alg == -7 + assert result.authn_params_public_key.authenticator_selection.resident_key == "required" + assert result.authn_params_public_key.user.display_name == "Test User" + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_public_key_extra_fields_preserved(mocker): + """Unknown WebAuthn fields (excludeCredentials, attestation, extensions) must not be dropped.""" + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/passkey|new"} + response.json = MagicMock(return_value={ + "auth_session": "session_abc", + "authn_params_public_key": { + "challenge": "dGVzdA", + "rp": {"id": "auth0.local", "name": "My App"}, + "user": {"id": "dXNlcl8x", "name": "user@test.com"}, + "pubKeyCredParams": [{"type": "public-key", "alg": -7}], + "excludeCredentials": [{"type": "public-key", "id": "Y3JlZA"}], + "attestation": "direct", + "extensions": {"appid": "https://auth0.local"}, + }, + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + result = await client.enroll_authentication_method(access_token="token123", request=req) + + pk = result.authn_params_public_key + assert pk.model_extra["excludeCredentials"] == [{"type": "public-key", "id": "Y3JlZA"}] + assert pk.model_extra["attestation"] == "direct" + assert pk.model_extra["extensions"] == {"appid": "https://auth0.local"} + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_missing_location(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {} + response.json = MagicMock(return_value={"auth_session": "session_abc"}) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + with pytest.raises(ApiError) as exc: + await client.enroll_authentication_method(access_token="token123", request=req) + + assert "Location header" in str(exc.value) + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_location_with_query(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/abc123?tracking=1"} + response.json = MagicMock(return_value={"auth_session": "session_abc"}) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + result = await client.enroll_authentication_method(access_token="token123", request=req) + assert result.authentication_method_id == "abc123" + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_location_absolute_url(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "https://tenant.auth0.com/me/v1/authentication-methods/am_xyz"} + response.json = MagicMock(return_value={"auth_session": "session_abc"}) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + result = await client.enroll_authentication_method(access_token="token123", request=req) + assert result.authentication_method_id == "am_xyz" + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_totp_preserves_secret(mocker): + """TOTP enrollment response includes totp_secret and barcode_uri — must not be dropped.""" + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/totp|new"} + response.json = MagicMock(return_value={ + "auth_session": "session_totp", + "totp_secret": "JBSWY3DPEHPK3PXP", + "barcode_uri": "otpauth://totp/Example:alice@example.com?secret=JBSWY3DPEHPK3PXP", + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="totp") + result = await client.enroll_authentication_method(access_token="token123", request=req) + + assert result.authentication_method_id == "totp|new" + assert result.auth_session == "session_totp" + assert result.model_extra["totp_secret"] == "JBSWY3DPEHPK3PXP" + assert result.model_extra["barcode_uri"].startswith("otpauth://") + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_oob_preserves_oob_code(mocker): + """OOB (email/phone) enrollment response includes oob_code — must not be dropped.""" + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/email|new"} + response.json = MagicMock(return_value={ + "auth_session": "session_oob", + "oob_code": "oob_abc123", + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="email") + result = await client.enroll_authentication_method(access_token="token123", request=req) + + assert result.authentication_method_id == "email|new" + assert result.model_extra["oob_code"] == "oob_abc123" + + +@pytest.mark.asyncio +async def test_verify_authentication_method_success(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 201 + response.json = MagicMock(return_value={ + "id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z", "confirmed": True, + }) + mock_post = mocker.patch( + "httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response + ) + + authn_response = PasskeyAuthResponse( + id="cred1", + raw_id="cmF3MQ", + type="public-key", + authenticator_attachment="platform", + response={"clientDataJSON": "abc", "attestationObject": "def"}, + ) + req = VerifyAuthenticationMethodRequest( + auth_session="session_abc", authn_response=authn_response + ) + result = await client.verify_authentication_method( + access_token="token123", authentication_method_id="passkey|new", request=req + ) + + assert isinstance(result, AuthenticationMethod) + assert result.confirmed is True + + call_kwargs = mock_post.call_args[1] + body = call_kwargs["json"] + assert "rawId" in body["authn_response"] + assert "raw_id" not in body["authn_response"] + assert "authenticatorAttachment" in body["authn_response"] + assert body["auth_session"] == "session_abc" + assert "passkey%7Cnew" in call_kwargs["url"] + + +@pytest.mark.asyncio +@pytest.mark.parametrize("method_id", [None, ""]) +async def test_verify_authentication_method_missing_id(mocker, method_id): + client = MyAccountClient(domain="auth0.local") + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + + req = VerifyAuthenticationMethodRequest(auth_session="session_abc", otp_code="123456") + with pytest.raises(MissingRequiredArgumentError): + await client.verify_authentication_method( + access_token="token123", authentication_method_id=method_id, request=req + ) + + +def test_enrollment_challenge_response_repr(): + resp = EnrollmentChallengeResponse( + authentication_method_id="am_1", + auth_session="super_secret_session", + authn_params_public_key=None, + ) + repr_str = repr(resp) + assert "super_secret_session" not in repr_str + assert "[REDACTED]" in repr_str + assert "am_1" in repr_str + + +def test_verify_request_auth_session_only_is_valid(): + req = VerifyAuthenticationMethodRequest(auth_session="session_abc") + assert req.auth_session == "session_abc" + assert req.otp_code is None + assert req.authn_response is None + + +def test_verify_request_accepts_otp_code(): + req = VerifyAuthenticationMethodRequest(auth_session="session_abc", otp_code="123456") + assert req.otp_code == "123456" + + +def test_verify_request_accepts_authn_response(): + authn_resp = PasskeyAuthResponse( + id="cred1", + raw_id="cmF3MQ", + type="public-key", + response={"clientDataJSON": "abc", "attestationObject": "def"}, + ) + req = VerifyAuthenticationMethodRequest(auth_session="session_abc", authn_response=authn_resp) + assert req.authn_response is not None + + +@pytest.mark.asyncio +async def test_get_factors_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={"factors": [{"name": "sms", "enabled": True}]}) + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + await client.get_factors(access_token="token123", dpop_key=dpop_key) + + mock_get.assert_awaited_once() + call_kwargs = mock_get.call_args[1] + assert isinstance(call_kwargs["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_list_authentication_methods_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={"authentication_methods": []}) + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + await client.list_authentication_methods(access_token="token123", dpop_key=dpop_key) + + assert isinstance(mock_get.call_args[1]["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_get_authentication_method_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z" + }) + mock_get = mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + await client.get_authentication_method( + access_token="token123", authentication_method_id="am_1", dpop_key=dpop_key + ) + + assert isinstance(mock_get.call_args[1]["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_delete_authentication_method_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 204 + mock_delete = mocker.patch( + "httpx.AsyncClient.delete", new_callable=AsyncMock, return_value=response + ) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + await client.delete_authentication_method( + access_token="token123", authentication_method_id="am_1", dpop_key=dpop_key + ) + + assert isinstance(mock_delete.call_args[1]["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_update_authentication_method_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 200 + response.json = MagicMock(return_value={ + "id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z" + }) + mock_patch = mocker.patch( + "httpx.AsyncClient.patch", new_callable=AsyncMock, return_value=response + ) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + req = UpdateAuthenticationMethodRequest(name="New Name") + await client.update_authentication_method( + access_token="token123", authentication_method_id="am_1", request=req, dpop_key=dpop_key + ) + + assert isinstance(mock_patch.call_args[1]["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/passkey|new"} + response.json = MagicMock(return_value={"auth_session": "session_abc"}) + mock_post = mocker.patch( + "httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response + ) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + req = EnrollAuthenticationMethodRequest(type="passkey") + await client.enroll_authentication_method( + access_token="token123", request=req, dpop_key=dpop_key + ) + + assert isinstance(mock_post.call_args[1]["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_verify_authentication_method_with_dpop_key(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 201 + response.json = MagicMock(return_value={ + "id": "am_1", "type": "passkey", "created_at": "2026-01-01T00:00:00Z" + }) + mock_post = mocker.patch( + "httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response + ) + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + req = VerifyAuthenticationMethodRequest(auth_session="session_abc", otp_code="123456") + await client.verify_authentication_method( + access_token="token123", + authentication_method_id="am_1", + request=req, + dpop_key=dpop_key, + ) + + assert isinstance(mock_post.call_args[1]["auth"], DPoPAuth) + + +@pytest.mark.asyncio +async def test_list_authentication_methods_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 403 + response.json = MagicMock(return_value={ + "title": "Forbidden", "type": "forbidden", "detail": "Insufficient scope", "status": 403, + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MyAccountApiError) as exc: + await client.list_authentication_methods(access_token="token123") + assert exc.value.status == 403 + + +@pytest.mark.asyncio +async def test_list_authentication_methods_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch( + "httpx.AsyncClient.get", new_callable=AsyncMock, side_effect=Exception("Connection refused") + ) + + with pytest.raises(ApiError): + await client.list_authentication_methods(access_token="token123") + + +@pytest.mark.asyncio +async def test_get_authentication_method_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 404 + response.json = MagicMock(return_value={ + "title": "Not Found", "type": "not_found", "detail": "Not found", "status": 404, + }) + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MyAccountApiError) as exc: + await client.get_authentication_method( + access_token="token123", authentication_method_id="am_1" + ) + assert exc.value.status == 404 + + +@pytest.mark.asyncio +async def test_get_authentication_method_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch("httpx.AsyncClient.get", new_callable=AsyncMock, side_effect=Exception("timeout")) + + with pytest.raises(ApiError): + await client.get_authentication_method( + access_token="token123", authentication_method_id="am_1" + ) + + +@pytest.mark.asyncio +async def test_delete_authentication_method_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 404 + response.json = MagicMock(return_value={ + "title": "Not Found", "type": "not_found", "detail": "Not found", "status": 404, + }) + mocker.patch("httpx.AsyncClient.delete", new_callable=AsyncMock, return_value=response) + + with pytest.raises(MyAccountApiError) as exc: + await client.delete_authentication_method( + access_token="token123", authentication_method_id="am_1" + ) + assert exc.value.status == 404 + + +@pytest.mark.asyncio +async def test_delete_authentication_method_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch( + "httpx.AsyncClient.delete", + new_callable=AsyncMock, + side_effect=Exception("Connection reset"), + ) + + with pytest.raises(ApiError): + await client.delete_authentication_method( + access_token="token123", authentication_method_id="am_1" + ) + + +@pytest.mark.asyncio +async def test_update_authentication_method_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 422 + response.json = MagicMock(return_value={ + "title": "Unprocessable", "type": "validation_error", "detail": "Invalid", "status": 422, + }) + mocker.patch("httpx.AsyncClient.patch", new_callable=AsyncMock, return_value=response) + + req = UpdateAuthenticationMethodRequest(name="x") + with pytest.raises(MyAccountApiError) as exc: + await client.update_authentication_method( + access_token="token123", authentication_method_id="am_1", request=req + ) + assert exc.value.status == 422 + + +@pytest.mark.asyncio +async def test_update_authentication_method_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch( + "httpx.AsyncClient.patch", new_callable=AsyncMock, side_effect=Exception("timeout") + ) + + req = UpdateAuthenticationMethodRequest(name="x") + with pytest.raises(ApiError): + await client.update_authentication_method( + access_token="token123", authentication_method_id="am_1", request=req + ) + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 403 + response.json = MagicMock(return_value={ + "title": "Forbidden", "type": "forbidden", "detail": "Scope missing", "status": 403, + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + with pytest.raises(MyAccountApiError) as exc: + await client.enroll_authentication_method(access_token="token123", request=req) + assert exc.value.status == 403 + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch( + "httpx.AsyncClient.post", + new_callable=AsyncMock, + side_effect=Exception("Connection refused"), + ) + + req = EnrollAuthenticationMethodRequest(type="passkey") + with pytest.raises(ApiError): + await client.enroll_authentication_method(access_token="token123", request=req) + + +@pytest.mark.asyncio +async def test_verify_authentication_method_api_error(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 400 + response.json = MagicMock(return_value={ + "title": "Bad Request", "type": "invalid_request", "detail": "Invalid OTP", "status": 400, + }) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = VerifyAuthenticationMethodRequest(auth_session="session_abc", otp_code="000000") + with pytest.raises(MyAccountApiError) as exc: + await client.verify_authentication_method( + access_token="token123", authentication_method_id="am_1", request=req + ) + assert exc.value.status == 400 + + +@pytest.mark.asyncio +async def test_verify_authentication_method_network_error(mocker): + client = MyAccountClient(domain="auth0.local") + mocker.patch( + "httpx.AsyncClient.post", + new_callable=AsyncMock, + side_effect=Exception("Connection refused"), + ) + + req = VerifyAuthenticationMethodRequest(auth_session="session_abc", otp_code="123456") + with pytest.raises(ApiError): + await client.verify_authentication_method( + access_token="token123", authentication_method_id="am_1", request=req + ) + + +@pytest.mark.asyncio +async def test_enroll_authentication_method_location_collection_url(mocker): + client = MyAccountClient(domain="auth0.local") + response = AsyncMock() + response.status_code = 202 + response.headers = {"location": "/me/v1/authentication-methods/"} + response.json = MagicMock(return_value={"auth_session": "session_abc"}) + mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=response) + + req = EnrollAuthenticationMethodRequest(type="passkey") + with pytest.raises(ApiError) as exc: + await client.enroll_authentication_method(access_token="token123", request=req) + assert "could not extract ID" in str(exc.value) + + +# ============================================================================= +# DPoP nonce retry (RFC 9449 §8.2) — tests DPoPAuth.auth_flow directly +# ============================================================================= + + +def test_dpop_auth_flow_retries_with_nonce_on_401(): + """ + DPoPAuth.auth_flow() must retry with DPoP-Nonce when server responds 401 + + DPoP-Nonce header (RFC 9449 §8.2). Tested by driving the generator directly. + """ + import base64 + import json as _json + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + auth = DPoPAuth(token="test_access_token", key=dpop_key) + + request = httpx.Request("GET", "https://auth0.local/me/v1/factors") + flow = auth.auth_flow(request) + + # First yield — initial request + first_request = next(flow) + assert "DPoP" in first_request.headers + assert "Authorization" in first_request.headers + + # First proof must not have nonce + proof1 = first_request.headers["DPoP"] + payload1_b64 = proof1.split(".")[1] + padding = 4 - len(payload1_b64) % 4 + payload1 = _json.loads(base64.urlsafe_b64decode(payload1_b64 + "=" * padding)) + assert "nonce" not in payload1 + + # Simulate 401 + DPoP-Nonce response + nonce_response = httpx.Response( + status_code=401, + headers={"DPoP-Nonce": "server-nonce-abc"}, + content=b'{"error":"use_dpop_nonce"}', + request=request, + ) + + # Second yield — retry request with nonce + try: + second_request = flow.send(nonce_response) + except StopIteration: + second_request = None + + assert second_request is not None + proof2 = second_request.headers["DPoP"] + payload2_b64 = proof2.split(".")[1] + padding = 4 - len(payload2_b64) % 4 + payload2 = _json.loads(base64.urlsafe_b64decode(payload2_b64 + "=" * padding)) + assert payload2["nonce"] == "server-nonce-abc" + + +def test_dpop_auth_flow_no_retry_on_non_401(): + """DPoPAuth.auth_flow() must NOT retry when the response is not 401.""" + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + auth = DPoPAuth(token="test_access_token", key=dpop_key) + + request = httpx.Request("GET", "https://auth0.local/me/v1/factors") + flow = auth.auth_flow(request) + next(flow) + + success_response = httpx.Response( + status_code=200, + content=b'{"factors":[]}', + request=request, + ) + + try: + flow.send(success_response) + retried = True + except StopIteration: + retried = False + + assert not retried + diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 47ba774..3c271ea 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -4816,3 +4816,1058 @@ async def _fake_fetch(self, domain): assert exc.value.mfa_requirements is not None finally: ServerClient._fetch_oidc_metadata = original_fetch + + +# ============================================================================= +# PASSKEY AUTHENTICATION +# ============================================================================= + +_PASSKEY_SIGNUP_CHALLENGE_RESPONSE = { + "auth_session": "session_abc123", + "authn_params_public_key": { + "challenge": "dGVzdC1jaGFsbGVuZ2U", + "rp": {"id": "auth0.local", "name": "Test App"}, + "user": {"id": "dXNlcl8x", "name": "user@example.com", "displayName": "Jane"}, + "pubKeyCredParams": [{"type": "public-key", "alg": -7}], + "authenticatorSelection": { + "residentKey": "required", + "userVerification": "preferred", + }, + "timeout": 60000, + }, +} + +_PASSKEY_LOGIN_CHALLENGE_RESPONSE = { + "auth_session": "session_login_xyz", + "authn_params_public_key": { + "challenge": "bG9naW4tY2hhbGxlbmdl", + "rpId": "auth0.local", + "timeout": 60000, + "userVerification": "preferred", + }, +} + +_PASSKEY_TOKEN_RESPONSE = { + "access_token": "at_passkey_123", + "id_token": "eyJ.test.jwt", + "token_type": "Bearer", + "expires_in": 86400, + "scope": "openid profile", +} + +_PASSKEY_TOKEN_RESPONSE_DPOP = { + "access_token": "at_passkey_dpop_123", + "id_token": "eyJ.test.jwt", + "token_type": "DPoP", + "expires_in": 86400, + "scope": "openid profile", +} + + +def _make_passkey_authn_response(): + from auth0_server_python.auth_types import PasskeyAuthResponse + return PasskeyAuthResponse( + id="cred_abc123", + raw_id="Y3JlZF9hYmMxMjM", + type="public-key", + authenticator_attachment="platform", + response={ + "clientDataJSON": "eyJ0eXBlIjoid2ViYXV0aG4uZ2V0In0", + "authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2M", + "signature": "MEUCIQC", + "userHandle": "dXNlcl8x", + }, + ) + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_success(mocker): + from auth0_server_python.auth_types import PasskeySignupChallengeResponse, PasskeyUserProfile + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_SIGNUP_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + result = await client.passkey_signup_challenge( + user_profile=PasskeyUserProfile(email="user@example.com", name="Jane Doe"), + connection="Username-Password-Authentication", + ) + + assert isinstance(result, PasskeySignupChallengeResponse) + assert result.auth_session == "session_abc123" + assert result.authn_params_public_key.challenge == "dGVzdC1jaGFsbGVuZ2U" + assert result.authn_params_public_key.rp.id == "auth0.local" + assert result.authn_params_public_key.user.display_name == "Jane" + assert result.authn_params_public_key.pub_key_cred_params[0].alg == -7 + assert result.authn_params_public_key.authenticator_selection.resident_key == "required" + + mock_post.assert_awaited_once() + args, kwargs = mock_post.call_args + assert "/passkey/register" in args[0] + body = kwargs["json"] + assert body["client_id"] == "test_client_id" + assert body["client_secret"] == "test_client_secret" + assert body["user_profile"]["email"] == "user@example.com" + assert body["user_profile"]["name"] == "Jane Doe" + assert body["realm"] == "Username-Password-Authentication" + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_user_profile_fields(mocker): + from auth0_server_python.auth_types import PasskeyUserProfile + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_SIGNUP_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + await client.passkey_signup_challenge( + user_profile=PasskeyUserProfile( + email="u@e.com", + username="jdoe", + phone_number="+1234567890", + given_name="Jane", + family_name="Doe", + nickname="jd", + picture="https://example.com/pic.jpg", + ), + user_metadata={"role": "admin"}, + organization="org_123", + ) + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body["user_profile"]["email"] == "u@e.com" + assert body["user_profile"]["username"] == "jdoe" + assert body["user_profile"]["phone_number"] == "+1234567890" + assert body["user_profile"]["given_name"] == "Jane" + assert body["user_profile"]["family_name"] == "Doe" + assert body["user_profile"]["nickname"] == "jd" + assert body["user_profile"]["picture"] == "https://example.com/pic.jpg" + assert "user_metadata" not in body["user_profile"] + assert body["user_metadata"] == {"role": "admin"} + assert body["organization"] == "org_123" + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_minimal_body(mocker): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_SIGNUP_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + await client.passkey_signup_challenge() + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body == {"client_id": "test_client_id", "client_secret": "test_client_secret"} + assert "user_profile" not in body + assert "user_metadata" not in body + assert "realm" not in body + assert "organization" not in body + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_user_metadata_root_level(mocker): + """user_metadata must be sent at root level, not nested inside user_profile.""" + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_SIGNUP_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + await client.passkey_signup_challenge( + user_metadata={"preferred_language": "en"}, + ) + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body["user_metadata"] == {"preferred_language": "en"} + assert "user_profile" not in body + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_api_error(mocker): + from auth0_server_python.auth_types import PasskeyUserProfile + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 403 + mock_response.json = MagicMock(return_value={ + "error": "access_denied", + "error_description": "Passkey not enabled", + }) + mock_post.return_value = mock_response + + with pytest.raises(PasskeyError) as exc: + await client.passkey_signup_challenge( + user_profile=PasskeyUserProfile(email="test@example.com") + ) + assert "access_denied" in str(exc.value) or "Passkey not enabled" in str(exc.value) + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_non_json_error(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 502 + mock_response.json = MagicMock(side_effect=json.JSONDecodeError("bad", "", 0)) + mock_post.return_value = mock_response + + with pytest.raises(PasskeyError) as exc: + await client.passkey_signup_challenge() + assert "502" in str(exc.value) or "passkey_challenge_error" in str(exc.value) + + +@pytest.mark.asyncio +async def test_passkey_signup_challenge_network_error(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_post.side_effect = Exception("Connection refused") + + with pytest.raises(PasskeyError) as exc: + await client.passkey_signup_challenge() + assert "Passkey signup challenge failed" in str(exc.value) + + +@pytest.mark.asyncio +async def test_passkey_login_challenge_success(mocker): + from auth0_server_python.auth_types import PasskeyLoginChallengeResponse + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_LOGIN_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + result = await client.passkey_login_challenge( + connection="Username-Password-Authentication", + organization="org_abc", + ) + + assert isinstance(result, PasskeyLoginChallengeResponse) + assert result.auth_session == "session_login_xyz" + assert result.authn_params_public_key.challenge == "bG9naW4tY2hhbGxlbmdl" + assert result.authn_params_public_key.rp_id == "auth0.local" + assert result.authn_params_public_key.user_verification == "preferred" + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body["client_id"] == "test_client_id" + assert body["realm"] == "Username-Password-Authentication" + assert body["organization"] == "org_abc" + assert "username" not in body + + +@pytest.mark.asyncio +async def test_passkey_login_challenge_minimal_body(mocker): + """No optional fields sent when called with no arguments.""" + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_LOGIN_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + await client.passkey_login_challenge() + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body == {"client_id": "test_client_id", "client_secret": "test_client_secret"} + assert "username" not in body + assert "realm" not in body + assert "organization" not in body + + +@pytest.mark.asyncio +async def test_passkey_login_challenge_with_username(mocker): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_LOGIN_CHALLENGE_RESPONSE) + mock_post.return_value = mock_response + + await client.passkey_login_challenge(username="jane@example.com") + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body["username"] == "jane@example.com" + + +@pytest.mark.asyncio +async def test_passkey_login_challenge_api_error(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 400 + mock_response.json = MagicMock(return_value={ + "error": "invalid_request", + "error_description": "Missing client_id", + }) + mock_post.return_value = mock_response + + with pytest.raises(PasskeyError): + await client.passkey_login_challenge() + + +@pytest.mark.asyncio +async def test_passkey_login_challenge_network_error(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_post.side_effect = Exception("timeout") + + with pytest.raises(PasskeyError): + await client.passkey_login_challenge() + + +@pytest.mark.asyncio +async def test_signin_with_passkey_success(mocker): + from auth0_server_python.auth_types import PasskeyLoginResult + state_store = AsyncMock() + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=state_store, + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "name": "Jane", "iss": "https://auth0.local/", "sid": "sid_abc" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + authn_response = _make_passkey_authn_response() + + result = await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=authn_response, + scope="openid profile", + audience="https://api.example.com", + connection="Username-Password-Authentication", + organization="org_abc", + ) + + assert isinstance(result, PasskeyLoginResult) + assert "token_sets" in result.state_data + assert result.state_data["token_sets"][0]["access_token"] == "at_passkey_123" + assert result.state_data["token_sets"][0]["audience"] == "https://api.example.com" + + # Session must be persisted + state_store.set.assert_awaited_once() + + mock_post.assert_awaited_once() + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert body["grant_type"] == "urn:okta:params:oauth:grant-type:webauthn" + assert body["client_id"] == "test_client_id" + assert body["client_secret"] == "test_client_secret" + assert body["auth_session"] == "session_xyz" + assert body["scope"] == "openid profile" + assert body["audience"] == "https://api.example.com" + assert body["realm"] == "Username-Password-Authentication" + assert body["organization"] == "org_abc" + assert body["authn_response"]["rawId"] == "Y3JlZF9hYmMxMjM" + assert body["authn_response"]["authenticatorAttachment"] == "platform" + assert "raw_id" not in body["authn_response"] + + +@pytest.mark.asyncio +async def test_signin_with_passkey_uses_json_content_type(mocker): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + + await client.signin_with_passkey( + auth_session="s", + authn_response=_make_passkey_authn_response(), + ) + + args, kwargs = mock_post.call_args + assert "json" in kwargs + assert "data" not in kwargs + + +@pytest.mark.asyncio +@pytest.mark.parametrize("auth_session", [None, ""]) +async def test_signin_with_passkey_missing_auth_session(auth_session): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + with pytest.raises(MissingRequiredArgumentError): + await client.signin_with_passkey( + auth_session=auth_session, + authn_response=_make_passkey_authn_response(), + ) + + +@pytest.mark.asyncio +async def test_signin_with_passkey_missing_authn_response(): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + with pytest.raises(MissingRequiredArgumentError): + await client.signin_with_passkey( + auth_session="session_abc", + authn_response=None, + ) + + +@pytest.mark.asyncio +async def test_signin_with_passkey_api_error(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token"}, + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 401 + mock_response.json = MagicMock(return_value={ + "error": "invalid_grant", + "error_description": "Invalid auth_session", + }) + mock_post.return_value = mock_response + + with pytest.raises(PasskeyError) as exc: + await client.signin_with_passkey( + auth_session="expired_session", + authn_response=_make_passkey_authn_response(), + ) + assert "invalid_grant" in str(exc.value) or "Invalid auth_session" in str(exc.value) + + +@pytest.mark.asyncio +async def test_signin_with_passkey_missing_token_endpoint(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={}) + + with pytest.raises(PasskeyError) as exc: + await client.signin_with_passkey( + auth_session="session", + authn_response=_make_passkey_authn_response(), + ) + assert "token endpoint" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_signin_with_passkey_network_error(mocker): + from auth0_server_python.error import PasskeyError + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token"}, + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_post.side_effect = Exception("Connection reset") + + with pytest.raises(PasskeyError): + await client.signin_with_passkey( + auth_session="session", + authn_response=_make_passkey_authn_response(), + ) + + +@pytest.mark.asyncio +async def test_signin_with_passkey_no_client_secret(mocker): + client = ServerClient( + domain="auth0.local", + client_id="public_client", + client_secret=None, + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + + from auth0_server_python.auth_types import PasskeyAuthResponse + authn_resp = PasskeyAuthResponse( + id="cred", + raw_id="cmF3", + type="public-key", + response={"clientDataJSON": "abc", "authenticatorData": "def", "signature": "ghi"}, + ) + await client.signin_with_passkey(auth_session="session", authn_response=authn_resp) + + args, kwargs = mock_post.call_args + body = kwargs["json"] + assert "client_secret" not in body + assert body["client_id"] == "public_client" + + +def test_passkey_signup_challenge_repr_redacts_auth_session(): + from auth0_server_python.auth_types import PasskeySignupChallengeResponse + resp = PasskeySignupChallengeResponse.model_validate(_PASSKEY_SIGNUP_CHALLENGE_RESPONSE) + repr_str = repr(resp) + assert "session_abc123" not in repr_str + assert "[REDACTED]" in repr_str + + +def test_passkey_login_challenge_repr_redacts_auth_session(): + from auth0_server_python.auth_types import PasskeyLoginChallengeResponse + resp = PasskeyLoginChallengeResponse.model_validate(_PASSKEY_LOGIN_CHALLENGE_RESPONSE) + repr_str = repr(resp) + assert "session_login_xyz" not in repr_str + assert "[REDACTED]" in repr_str + + +def test_passkey_token_response_repr_redacts_tokens(): + from auth0_server_python.auth_types import PasskeyTokenResponse + resp = PasskeyTokenResponse( + access_token="secret_at_value", + token_type="Bearer", + expires_in=86400, + id_token="secret_id_token", + refresh_token="secret_rt_value", + ) + repr_str = repr(resp) + assert "secret_at_value" not in repr_str + assert "secret_id_token" not in repr_str + assert "secret_rt_value" not in repr_str + assert "[REDACTED]" in repr_str + assert "86400" in repr_str + + +@pytest.mark.asyncio +async def test_signin_with_passkey_preserves_server_expires_at(mocker): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value={ + "access_token": "at_123", + "token_type": "Bearer", + "expires_in": 3600, + "expires_at": 9999999999, + }) + mock_post.return_value = mock_response + + result = await client.signin_with_passkey( + auth_session="session", + authn_response=_make_passkey_authn_response(), + ) + assert result.state_data["token_sets"][0]["expires_at"] == 9999999999 + + +@pytest.mark.asyncio +async def test_signin_with_passkey_missing_expires_at_calculates(mocker): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value={ + "access_token": "at_123", + "token_type": "Bearer", + "expires_in": 60, + }) + mock_post.return_value = mock_response + + result = await client.signin_with_passkey( + auth_session="session", + authn_response=_make_passkey_authn_response(), + ) + assert abs(result.state_data["token_sets"][0]["expires_at"] - (int(time.time()) + 60)) <= 2 + + +@pytest.mark.asyncio +async def test_signin_with_passkey_dpop_attaches_proof_header(mocker): + import base64 + import json as _json + from jwcrypto import jwk as jwk_module + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE_DPOP) + mock_post.return_value = mock_response + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + dpop_key=dpop_key, + ) + + args, kwargs = mock_post.call_args + assert "DPoP" in kwargs["headers"] + + # Decode proof and assert no ath claim (token endpoint proof — RFC 9449 §4.2) + proof = kwargs["headers"]["DPoP"] + payload_b64 = proof.split(".")[1] + padding = 4 - len(payload_b64) % 4 + payload = _json.loads(base64.urlsafe_b64decode(payload_b64 + "=" * padding)) + assert "ath" not in payload + assert "jti" in payload + assert payload["htm"] == "POST" + assert payload["htu"] == "https://auth0.local/oauth/token" + + +@pytest.mark.asyncio +async def test_signin_with_passkey_dpop_nonce_retry(mocker): + import base64 + import json as _json + from jwcrypto import jwk as jwk_module + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + + nonce_response = AsyncMock() + nonce_response.status_code = 401 + nonce_response.headers = {"DPoP-Nonce": "server-nonce-abc"} + nonce_response.json = MagicMock(return_value={"error": "use_dpop_nonce"}) + + success_response = AsyncMock() + success_response.status_code = 200 + success_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE_DPOP) + + mock_post.side_effect = [nonce_response, success_response] + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + result = await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + dpop_key=dpop_key, + ) + + assert mock_post.await_count == 2 + assert result.state_data["token_sets"][0]["access_token"] == "at_passkey_dpop_123" + + # Second call must include the nonce in the DPoP proof + second_call_kwargs = mock_post.call_args_list[1][1] + proof = second_call_kwargs["headers"]["DPoP"] + payload_b64 = proof.split(".")[1] + padding = 4 - len(payload_b64) % 4 + payload = _json.loads(base64.urlsafe_b64decode(payload_b64 + "=" * padding)) + assert payload["nonce"] == "server-nonce-abc" + + +@pytest.mark.asyncio +async def test_signin_with_passkey_dpop_rejects_bearer_downgrade(mocker): + """Server returning token_type=Bearer when DPoP was requested must raise PasskeyError.""" + from auth0_server_python.error import PasskeyError + from jwcrypto import jwk as jwk_module + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + + dpop_key = jwk_module.JWK.generate(kty="EC", crv="P-256") + with pytest.raises(PasskeyError) as exc: + await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + dpop_key=dpop_key, + ) + assert "DPoP" in str(exc.value) or "token_type" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_signin_with_passkey_missing_issuer_in_metadata(mocker): + """Missing 'issuer' in OIDC metadata must raise IssuerValidationError, not silently pass.""" + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + + with pytest.raises(Exception) as exc: + await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + ) + assert "issuer" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_signin_with_passkey_without_dpop_no_dpop_header(mocker): + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", "iss": "https://auth0.local/" + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + + await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + ) + + args, kwargs = mock_post.call_args + assert "DPoP" not in kwargs.get("headers", {}) + + +@pytest.mark.asyncio +async def test_signin_with_passkey_creates_session_in_state_store(mocker): + """signin_with_passkey must persist a session — consistent with complete_interactive_login.""" + from auth0_server_python.auth_types import PasskeyLoginResult + state_store = AsyncMock() + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=state_store, + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mocker.patch.object(client, "_get_jwks_cached", return_value={}) + mocker.patch.object(client, "_verify_and_decode_jwt", return_value={ + "sub": "auth0|user123", + "name": "Jane Doe", + "email": "jane@example.com", + "iss": "https://auth0.local/", + "sid": "session_sid_abc", + }) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value=_PASSKEY_TOKEN_RESPONSE) + mock_post.return_value = mock_response + + result = await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + ) + + # State store must be called exactly once + state_store.set.assert_awaited_once() + + # Result must be PasskeyLoginResult, not bare tokens + assert isinstance(result, PasskeyLoginResult) + + # State data must contain user, token_sets, domain, internal + sd = result.state_data + assert sd["user"]["sub"] == "auth0|user123" + assert sd["user"]["name"] == "Jane Doe" + assert sd["token_sets"][0]["access_token"] == "at_passkey_123" + assert sd["id_token"] == "eyJ.test.jwt" + assert sd["refresh_token"] is None + assert sd["domain"] == "auth0.local" + assert sd["internal"]["sid"] == "session_sid_abc" + assert "created_at" in sd["internal"] + + +@pytest.mark.asyncio +async def test_signin_with_passkey_session_without_id_token(mocker): + """When no id_token is returned, session is still created with user=None.""" + from auth0_server_python.auth_types import PasskeyLoginResult + state_store = AsyncMock() + client = ServerClient( + domain="auth0.local", + client_id="test_client_id", + client_secret="test_client_secret", + state_store=state_store, + transaction_store=AsyncMock(), + secret="test-secret-value", + ) + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"token_endpoint": "https://auth0.local/oauth/token", "issuer": "https://auth0.local/"}, + ) + mock_post = mocker.patch("httpx.AsyncClient.post", new_callable=AsyncMock) + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.json = MagicMock(return_value={ + "access_token": "at_no_id_token", + "token_type": "Bearer", + "expires_in": 3600, + }) + mock_post.return_value = mock_response + + result = await client.signin_with_passkey( + auth_session="session_xyz", + authn_response=_make_passkey_authn_response(), + ) + + assert isinstance(result, PasskeyLoginResult) + state_store.set.assert_awaited_once() + assert result.state_data["user"] is None + assert result.state_data["token_sets"][0]["access_token"] == "at_no_id_token"