Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ Legacy OAuth2 Properties will be removed in PyIceberg 1.0 in place of pluggable
| rest.signing-region | us-east-1 | The region to use when SigV4 signing a request |
| rest.signing-name | execute-api | The service signing name to use when SigV4 signing a request |

SigV4 can also be enabled as `auth.type: sigv4`, which additionally lets you choose the wrapped header-based auth (see the AuthManager section below).

##### Pluggable Authentication via AuthManager

The RESTCatalog supports pluggable authentication via the `auth` configuration block. This allows you to specify which how the access token will be fetched and managed for use with the HTTP requests to the RESTCatalog server. The authentication method is selected by setting the `auth.type` property, and additional configuration can be provided as needed for each method.
Expand All @@ -396,6 +398,7 @@ The RESTCatalog supports pluggable authentication via the `auth` configuration b
- `custom`: Custom authentication manager (requires `auth.impl`).
- `google`: Google Authentication support
- `entra`: Microsoft Entra ID (Azure AD) authentication support
- `sigv4`: AWS SigV4 request signing, optionally wrapping a delegate auth type.

###### Configuration Properties

Expand Down Expand Up @@ -424,6 +427,7 @@ catalog:
| `auth.custom` | If type is `custom` | Block containing configuration for the custom AuthManager. |
| `auth.google` | If type is `google` | Block containing `credentials_path` to a service account file (if using). Will default to using Application Default Credentials. |
| `auth.entra` | If type is `entra` | Block containing Entra ID configuration. Will default to using DefaultAzureCredential. |
| `auth.sigv4` | If type is `sigv4` | Block containing an optional `delegate` auth block whose `Authorization` header is preserved as `Original-Authorization` after signing. Signing region/name come from `rest.signing-region`/`rest.signing-name`; AWS credentials from `client.*` or the standard boto3 chain. |

###### Examples

Expand Down Expand Up @@ -469,6 +473,24 @@ auth:
property2: value2
```

SigV4 Signing (wrapping OAuth2):

```yaml
auth:
type: sigv4
sigv4:
delegate:
type: oauth2
oauth2:
client_id: my-client-id
client_secret: my-client-secret
token_url: https://auth.example.com/oauth/token
rest.signing-region: us-east-1
rest.signing-name: execute-api
client.access-key-id: my-access-key
client.secret-access-key: my-secret-key
```

###### Notes

- If `auth.type` is `custom`, you **must** specify `auth.impl` with the full class path to your custom AuthManager.
Expand Down
148 changes: 79 additions & 69 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@

from pyiceberg import __version__
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
from pyiceberg.catalog.rest.auth import AUTH_MANAGER, AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
from pyiceberg.catalog.rest.auth import (
AUTH_MANAGER,
AuthManager,
AuthManagerAdapter,
AuthManagerFactory,
LegacyOAuth2AuthManager,
NoopAuthManager,
SigV4AuthManager,
)
from pyiceberg.catalog.rest.response import _handle_non_200_response
from pyiceberg.catalog.rest.scan_planning import (
FetchScanTasksRequest,
Expand Down Expand Up @@ -251,11 +259,11 @@ class ScanPlanningMode(Enum):
CA_BUNDLE = "cabundle"
SSL = "ssl"
SIGV4 = "rest.sigv4-enabled"
SIGV4_AUTH_TYPE = "sigv4"
SIGV4_REGION = "rest.signing-region"
SIGV4_SERVICE = "rest.signing-name"
SIGV4_MAX_RETRIES = "rest.sigv4.max-retries"
SIGV4_MAX_RETRIES_DEFAULT = 10
EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
OAUTH2_SERVER_URI = "oauth2-server-uri"
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
AUTH = "auth"
Expand Down Expand Up @@ -435,10 +443,59 @@ def _create_session(self) -> Session:
elif ssl_client_cert := ssl_client.get(CERT):
session.cert = ssl_client_cert

self._auth_manager = self._build_auth_manager(session)
session.auth = AuthManagerAdapter(self._auth_manager)

# SigV4 retry is decoupled from signing: mount a plain retry adapter.
if self._is_sigv4_enabled():
from requests.adapters import HTTPAdapter

max_retries = property_as_int(self.properties, SIGV4_MAX_RETRIES, SIGV4_MAX_RETRIES_DEFAULT)
session.mount(self.uri, HTTPAdapter(max_retries=max_retries))

return session

def _is_sigv4_enabled(self) -> bool:
"""Return True if SigV4 signing is requested via either config path."""
if property_as_bool(self.properties, SIGV4, False):
return True
auth_config = self.properties.get(AUTH)
return auth_config is not None and auth_config.get("type") == SIGV4_AUTH_TYPE

def _build_auth_manager(self, session: Session) -> AuthManager:
"""Build the AuthManager, wrapping the delegate in SigV4 when enabled."""
delegate = self._build_delegate_auth_manager(session)
if self._is_sigv4_enabled():
if property_as_bool(self.properties, SIGV4, False):
deprecation_message(
deprecated_in="0.11.0",
removed_in="1.0.0",
help_message=f"The property {SIGV4} is deprecated. Please use auth.type={SIGV4_AUTH_TYPE} instead",
)
return self._build_sigv4_auth_manager(delegate)
return delegate

def _build_delegate_auth_manager(self, session: Session) -> AuthManager:
"""Build the header-based AuthManager (the SigV4 delegate, or the manager used directly)."""
if auth_config := self.properties.get(AUTH):
auth_type = auth_config.get("type")
if auth_type is None:
raise ValueError("auth.type must be defined")

if auth_type == SIGV4_AUTH_TYPE:
if auth_config.get("impl"):
raise ValueError("auth.impl can only be specified when using custom auth.type")
# The delegate is configured under auth.sigv4.delegate.*
sigv4_config = auth_config.get(SIGV4_AUTH_TYPE, {})
delegate_config = sigv4_config.get("delegate")
if not delegate_config or "type" not in delegate_config:
# No delegate configured: SigV4-only auth, with no header-based delegate.
return NoopAuthManager()
delegate_type = delegate_config["type"]
if delegate_type == SIGV4_AUTH_TYPE:
raise ValueError("Cannot delegate a SigV4 auth manager to another SigV4 auth manager")
return AuthManagerFactory.create(delegate_type, delegate_config.get(delegate_type, {}))

auth_type_config = auth_config.get(auth_type, {})
auth_impl = auth_config.get("impl")

Expand All @@ -448,17 +505,28 @@ def _create_session(self) -> Session:
if auth_type != CUSTOM and auth_impl:
raise ValueError("auth.impl can only be specified when using custom auth.type")

self._auth_manager = AuthManagerFactory.create(auth_impl or auth_type, auth_type_config)
session.auth = AuthManagerAdapter(self._auth_manager)
else:
self._auth_manager = self._create_legacy_oauth2_auth_manager(session)
session.auth = AuthManagerAdapter(self._auth_manager)
return AuthManagerFactory.create(auth_impl or auth_type, auth_type_config)

# Configure SigV4 Request Signing
if property_as_bool(self.properties, SIGV4, False):
self._init_sigv4(session)
return self._create_legacy_oauth2_auth_manager(session)

return session
def _build_sigv4_auth_manager(self, delegate: AuthManager) -> AuthManager:
"""Wrap the delegate AuthManager in a SigV4AuthManager."""
import boto3

boto_session = boto3.Session(
profile_name=get_first_property_value(self.properties, AWS_PROFILE_NAME),
region_name=get_first_property_value(self.properties, AWS_REGION),
botocore_session=self.properties.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(self.properties, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(self.properties, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(self.properties, AWS_SESSION_TOKEN),
)
return SigV4AuthManager(
delegate=delegate,
boto_session=boto_session,
region=self.properties.get(SIGV4_REGION),
service=self.properties.get(SIGV4_SERVICE, "execute-api"),
)

@staticmethod
def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties:
Expand Down Expand Up @@ -761,64 +829,6 @@ def _split_identifier_for_json(self, identifier: str | Identifier) -> dict[str,
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]}

def _init_sigv4(self, session: Session) -> None:
from urllib import parse

import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from requests import PreparedRequest
from requests.adapters import HTTPAdapter

class SigV4Adapter(HTTPAdapter):
def __init__(self, **properties: str):
self._properties = properties
max_retries = property_as_int(self._properties, SIGV4_MAX_RETRIES, SIGV4_MAX_RETRIES_DEFAULT)
super().__init__(max_retries=max_retries)
self._boto_session = boto3.Session(
profile_name=get_first_property_value(self._properties, AWS_PROFILE_NAME),
region_name=get_first_property_value(self._properties, AWS_REGION),
botocore_session=self._properties.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(self._properties, AWS_SESSION_TOKEN),
)

def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613
credentials = self._boto_session.get_credentials().get_frozen_credentials()
region = self._properties.get(SIGV4_REGION, self._boto_session.region_name)
service = self._properties.get(SIGV4_SERVICE, "execute-api")

url = str(request.url).split("?")[0]
query = str(parse.urlsplit(request.url).query)
params = dict(parse.parse_qsl(query))

# remove the connection header as it will be updated after signing
if "connection" in request.headers:
del request.headers["connection"]
# For empty bodies, explicitly set the content hash header to the SHA256 of an empty string
if not request.body:
request.headers["x-amz-content-sha256"] = EMPTY_BODY_SHA256

aws_request = AWSRequest(
method=request.method, url=url, params=params, data=request.body, headers=dict(request.headers)
)

SigV4Auth(credentials, service, region).add_auth(aws_request)
original_header = request.headers
signed_headers = aws_request.headers
relocated_headers = {}

# relocate headers if there is a conflict with signed headers
for header, value in original_header.items():
if header in signed_headers and signed_headers[header] != value:
relocated_headers[f"Original-{header}"] = value

request.headers.update(relocated_headers)
request.headers.update(signed_headers)

session.mount(self.uri, SigV4Adapter(**self.properties))

def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
# Per Iceberg spec: storage-credentials take precedence over config
credential_config = self._resolve_storage_credentials(
Expand Down
Loading