Skip to content

fix(rab): run async background boundary refresh on detached session#17441

Open
nbayati wants to merge 14 commits into
googleapis:mainfrom
nbayati:fix-async-rab-transport
Open

fix(rab): run async background boundary refresh on detached session#17441
nbayati wants to merge 14 commits into
googleapis:mainfrom
nbayati:fix-async-rab-transport

Conversation

@nbayati

@nbayati nbayati commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

When AuthorizedSession.request() makes an API call, it runs inside a temporary aiohttp ClientSession block. If our background Regional Access Boundary (RAB) refresh worker naively shares this exact same session, a fast primary call (like an instant 401/403 or a quick CRM check) will exit its block and close the active socket mid-flight. This causes the background worker to silently fail with "RuntimeError: Session is closed" and forces the RAB manager into a 15-minute cooldown.

This commit solves the race condition by giving the background worker its own separate transport session:

  • Added a clone() method to async Request adapters to create a fresh, independent ClientSession sharing the exact same corporate proxy and trace configurations.
  • Updated _AsyncRegionalAccessBoundaryRefreshManager to unwrap any partial timeouts, clone the request, and run the background lookup on that detached session.
  • Added a finally block to make sure the detached session gets properly closed when the lookup settles to prevent socket leaks.

To recreate the bug consistently and verify our fix, I used a reproduction script (paste/6312404345552896) that simulates a fast main API call (like an instant 401/403 Edge drop or highly optimized endpoint) by cancelling the primary request 30ms after it is dispatched.

nbayati added 2 commits June 11, 2026 17:42
Prevents fast primary API calls from closing the underlying aiohttp session
mid-flight and breaking background Regional Access Boundary (RAB) lookups.
Adds a clone() method to async Request adapters to run background refreshes
on an independent session, closing it cleanly when finished.
@nbayati nbayati requested review from a team as code owners June 12, 2026 02:25

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces request cloning for asynchronous regional access boundary refreshes to prevent race conditions where a foreground call closes a shared session. It adds a clone() method to the aiohttp request adapters and ensures that cloned requests are cleanly closed. The review feedback recommends ensuring the inspect module is imported and improving the exception handling when closing the cloned request by catching general exceptions and logging a warning instead of silently ignoring specific errors.

Comment thread packages/google-auth/google/auth/_regional_access_boundary_utils.py Outdated
@nbayati nbayati changed the title Fix async rab transport fix(rab): run async background boundary refresh on detached session Jun 12, 2026

@lsirac lsirac left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR! Just a few comments on potential resource leaks, test flakiness, coverage, and assertion safety.

Comment thread packages/google-auth/google/auth/_regional_access_boundary_utils.py Outdated
Comment thread packages/google-auth/tests/test__regional_access_boundary_utils.py Outdated
Comment thread packages/google-auth/tests/test__regional_access_boundary_utils.py Outdated
Comment thread packages/google-auth/tests/transport/aio/test_aiohttp.py
@nbayati nbayati requested a review from lsirac June 12, 2026 17:33
await self._session.close()
self._closed = True

def clone(self) -> "Request":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current clone() implementation creates a brand-new, default ClientSession. This drops all customized connection-level configurations on the original transport, such as proxies, SSL/TLS contexts, and private CA certificates (which are configured on the underlying TCPConnector), along with session-level default headers, cookies, basic auth, and timeouts. This will cause requests to fail in secure enterprise environments requiring custom proxy or cert setups.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I updated Request.clone() to fully reconstruct the underlying network connector (TCPConnector and UnixConnector) so that custom SSL contexts, IP bindings, and connection limits are preserved. I also made sure that all session-level defaults (including headers, cookies, basic auth, and custom timeouts) are completely mirrored in the cloned session.

Comment thread packages/google-auth/google/auth/aio/transport/__init__.py
Comment thread packages/google-auth/google/auth/aio/transport/__init__.py Outdated
Comment thread packages/google-auth/google/auth/_regional_access_boundary_utils.py Outdated
Comment thread packages/google-auth/google/auth/_regional_access_boundary_utils.py Outdated
@nbayati nbayati requested review from daniel-sanche and lsirac June 12, 2026 19:44
def _clone(self) -> "Request":
"""Creates a copy of this request adapter.
The base implementation returns `self` (an identical shared instance).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I still think a name other than clone should be considered. Gemini suggests _isolate() or _branch(). But this doesn't matter too much if it's internal

return new_request, cloned_callable, is_cloned


async def _close_cloned_request(lookup_request, is_cloned):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems like _prepare_async_lookup_callable and _close_cloned_request would be a good fit for a context manager. That would let us encapsulate these three variables, and enforce automatic closing.

Gemini put this together:

@contextlib.asynccontextmanager
async def _managed_lookup_callable(request):
    """An async context manager that prepares a cloned lookup callable 
    and guarantees its transport is closed on exit.
    """
    lookup_callable, lookup_request, is_cloned = _prepare_async_lookup_callable(request)
    try:
        yield lookup_callable
    finally:
        await _close_cloned_request(lookup_request, is_cloned)


# ... Inside your class/function where _worker is defined:

async def _worker():
    try:
        async with _managed_lookup_callable(request) as lookup_callable:
            regional_access_boundary_info = (
                await credentials._lookup_regional_access_boundary(lookup_callable)
            )
    except Exception as e:
        if _helpers.is_logging_enabled(_LOGGER):
            _LOGGER.warning(
                "Failed regional access boundary lookup: %s", 
                e, 
                exc_info=True
            )
        regional_access_boundary_info = None

But this is just a suggestion that came to mind, I think it's fine to merge as-is too.

new_exc = exceptions.TransportError(caught_exc)
raise new_exc from caught_exc

def clone(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the recent push, the regional access boundary check was updated to search for _clone(), and the modern transport aiohttp.py was updated accordingly. However, google/auth/transport/_aiohttp_requests.py was left with the public clone() method.

Because hasattr(base_callable, "_clone") evaluates to False on _aiohttp_requests.Request, it will completely skip cloning and silently fall back to the un-cloned session, re-introducing the original "Session is closed" background crash.

We should rename this method to _clone(self) (or add a private _clone alias pointing to clone for backward compatibility) and update its implementation to copy the connector configuration, default headers, cookies, auth, and timeouts just like aiohttp.py does.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. I missed updating the legacy transport in the previous commit. I've renamed the method to _clone() in _aiohttp_requests.py and updated its implementation to fully copy the ClientSession and TCPConnector configurations (headers, cookies, timeouts, trace configs, etc.) to mirror what we did in aiohttp.py. I also added the corresponding test coverage for it


exc.match("session is closed.")
aiohttp_request._closed = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added cloning and close methods on the Request adapter in google/auth/transport/_aiohttp_requests.py currently have 0% test coverage.

We should add unit tests in tests_async/transport/test_aiohttp_requests.py to verify that _aiohttp_requests.Request.clone() and .close() work correctly, replicate the session configuration, and are idempotent.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to test coverate, I noticed today that the CI check for that is failing open.

@nbayati you can, try to run the check locally, so we don't have too much clean-up to do here when the CI issue is resolved

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsirac Done!

@daniel-sanche Thanks for the heads-up. I ran the check locally and I'm adding unit tests to cover the missing paths.

@nbayati nbayati requested a review from lsirac June 12, 2026 22:48

@lsirac lsirac left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are 3 critical, subtle architectural findings regarding the custom _clone() implementation that was recently introduced for async Regional Access Boundary refreshes.

resolver=getattr(orig_connector, "_resolver", None),
local_addr=getattr(orig_connector, "_local_addr", None),
)
elif isinstance(orig_connector, aiohttp.UnixConnector):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Windows systems, aiohttp.UnixConnector is not defined in the aiohttp namespace. Accessing it directly here via isinstance(orig_connector, aiohttp.UnixConnector) will raise an AttributeError and crash the application during any cloned transport invocation (such as asynchronous Regional Access Boundary refreshes), even if standard TCP connectors are used.

To fix this, access it safely using getattr as is done correctly in _aiohttp_requests.py:

elif getattr(aiohttp, "UnixConnector", None) and isinstance(
    orig_connector, getattr(aiohttp, "UnixConnector")
):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

limit=getattr(orig_connector, "_limit", 100),
limit_per_host=getattr(orig_connector, "_limit_per_host", 0),
force_close=getattr(orig_connector, "_force_close", False),
resolver=getattr(orig_connector, "_resolver", None),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying the original connector's resolver via resolver=getattr(orig_connector, "_resolver", None) shares the exact same stateful DNS resolver instance between the original and cloned sessions.

When the background refresh worker finishes and closes the cloned transport, the cloned session closes its connector, terminating the shared resolver (e.g., in AsyncResolver, this cancels and clears the internal resolver reference). This permanently breaks the original/surviving session, causing subsequent foreground requests to fail with an AttributeError during DNS resolution.

To resolve this, avoid copying the resolver instance and let the cloned connector instantiate its own independent resolver.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

# Copy underlying connection pool settings (SSL context, IP bindings, limits).
orig_connector = getattr(self._session, "_connector", None)
if orig_connector and not orig_connector.closed:
if isinstance(orig_connector, aiohttp.TCPConnector):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _clone() implementation explicitly checks only for standard TCPConnector and UnixConnector instances. If the original session is configured with a custom, proxy, or subclassed connector (such as corporate SOCKS or tunneling proxies), the check falls through and the cloned session is created with a default, direct-connection TCPConnector.

This silently drops the proxy/custom configuration and routes traffic directly over the public internet, which will fail or violate security constraints in enterprise/isolated cloud environments. We should either explicitly support proxy preservation or raise a clear transport exception if an unsupported custom connector is detected.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good point.

We can't really support proxy preservation because third-party aiohttp connectors have arbitrary, unknown constructor signatures (meaning we have no way to instantiate a fresh detached copy of them dynamically), and simply shallow-copying the existing connector is unsafe due to shared socket pools. This leaves us two options: fallback to re-using the customer transport and hope that we don't encounter the bug this PR is trying to fix, or raise the exception as you suggested and accept this as a limitation of RAB.

I've decided not to fallback to re-using the customer's transport if we can't clone it, because it's not just that the RAB call would fail, but also there's another risk: if the foreground task closes the session while the background worker is actively reading from it, the forceful socket truncation mid-flight can leave complex corporate proxy connections in a hung or corrupted state, which means that the affects won't be limited to our RAB calls. So I've added the else: raise exceptions.TransportError(...) block, as raising the error here is the safest path. The exception will trigger the 15-minute cooldown and allow the user's main request to proceed safely.

I thought about disabling RAB permanently if we can't clone the transport (thinking what's the point of entering cooldown if we're going to keep trying to clone it and fail), but decided against it. I realized that because credentials objects are frequently instantiated globally and shared across multiple different clients and API surfaces, there's a chance that the next call would be executed over entirely different transports, making the RAB call possible.

@nbayati nbayati requested a review from lsirac June 13, 2026 00:52
lookup_callable,
lookup_request,
is_cloned,
) = _prepare_async_lookup_callable(request)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientSession can still be closed and the request will still fail with the current implementation as with the request being created inside the background worker, start_refresh() will potentially return before _clone() runs.

Returns:
google.auth.aio.transport.aiohttp.Request: A request adapter copy
running a new aiohttp.ClientSession with identical connection,
proxy, and session configurations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring can be reworded. I think the current implementation of clone does not actually clone everything. Also doesn't mention unsupported cases.

Comment on lines +499 to +500
maybe_coro = lookup_request.close()
if is_async := inspect.iscoroutine(maybe_coro):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are missing valid awaitable cases here as written (e.g. if Future returned by custom transports)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants