diff --git a/KERNEL_REV b/KERNEL_REV index 696572aef..2031bb3d7 100644 --- a/KERNEL_REV +++ b/KERNEL_REV @@ -1 +1 @@ -f4ee6fec78aabce8c0ea9c1ff47fc11b8191d013 +7ffb30d533c08651ca707b8dd13894c9e01cb68e diff --git a/src/databricks/sql/backend/kernel/client.py b/src/databricks/sql/backend/kernel/client.py index cb3b0b7ba..c6e66d661 100644 --- a/src/databricks/sql/backend/kernel/client.py +++ b/src/databricks/sql/backend/kernel/client.py @@ -16,13 +16,6 @@ - ``query_tags`` on execute is not supported (kernel exposes ``statement_conf`` but PyO3 doesn't surface it). -- ``get_tables`` with a non-empty ``table_types`` filter applies - the filter client-side; today the kernel returns the full - ``SHOW TABLES`` shape unchanged. The connector's existing - ``ResultSetFilter.filter_tables_by_type`` is keyed on - ``SeaResultSet`` not ``KernelResultSet``, so we punt and let - the caller see all rows — documented as a known gap in the - design doc. - Volume PUT/GET (staging operations): kernel has no Volume API yet. Users on Thrift-only paths. """ @@ -32,7 +25,7 @@ import logging import threading import uuid -from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Union +from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union from databricks.sql.backend.databricks_client import DatabricksClient from databricks.sql.backend.kernel._errors import ( @@ -52,7 +45,6 @@ from databricks.sql.exc import ( InterfaceError, NotSupportedError, - OperationalError, ProgrammingError, ) from databricks.sql.thrift_api.TCLIService import ttypes @@ -107,6 +99,57 @@ def _strip_leading_sql_comments(sql: str) -> str: return sql[i:] +def _is_not_found(exc: BaseException) -> bool: + """True iff ``exc`` is a kernel ``NotFound`` error (HTTP 404 / + ``STATEMENT_NOT_FOUND``). + + Used by ``get_query_state`` to recognise an async statement the + server no longer knows about (closed and aged out of the result TTL, + or never a server statement) and treat it as terminal, rather than + surfacing a raw error. Keyed on the kernel ``ErrorCode`` string + (``"NotFound"``) — verified live against a SEA warehouse: an + unknown/expired statement id returns 404 which the kernel maps to + ``ErrorCode::NotFound`` (``retryable=False``), distinct from a + transient 5xx (retryable) or a malformed-id 400.""" + return ( + isinstance(exc, _kernel.KernelError) + and getattr(exc, "code", None) == "NotFound" + ) + + +def _none_if_blank(value: Optional[str]) -> Optional[str]: + """Map an empty/whitespace-only metadata filter to ``None`` + ("match all"), matching the Thrift backend's effective behaviour. + + The kernel's ``Identifier`` / ``LikePattern`` reject ``""`` with + ``InvalidArgument`` (-> ``ProgrammingError``); ``None`` is the + kernel's canonical "match all". Applied to schema / table / column + *pattern* args (which otherwise keep ``%`` / ``_`` as real LIKE + wildcards).""" + if value is None: + return None + return value if value.strip() else None + + +def _catalog_or_none(value: Optional[str]) -> Optional[str]: + """Normalise a catalog filter: ``None`` / blank / ``'%'`` / ``'*'`` + all mean "all catalogs" -> ``None``. + + This makes ``columns(catalog='%')`` behave like + ``tables(catalog='%')`` / ``schemas(catalog='%')`` — the kernel + already treats blank/``%``/``*`` as "all catalogs" for SHOW SCHEMAS + / SHOW TABLES (``is_null_or_wildcard``) but treats the catalog as an + exact identifier for SHOW COLUMNS, so the three diverged. Normalising + connector-side makes them symmetric. This intentionally diverges from + raw-Thrift literalness (Thrift treats ``%`` as a literal catalog + name) in favour of JDBC "catalog is exact-or-all, not a pattern" + + internal consistency. Catalog is the only arg normalised this way; + schema/table/column patterns keep ``%`` / ``*`` as LIKE wildcards.""" + if value is None or not value.strip() or value in ("%", "*"): + return None + return value + + def _is_staging_statement(operation: str) -> bool: """True iff ``operation`` is a volume/staging statement (PUT / GET / REMOVE). @@ -202,9 +245,20 @@ def __init__( self._kernel_session: Optional[Any] = None self._session_id: Optional[SessionId] = None # Async-exec handles keyed by CommandId.guid. Populated by - # ``execute_command(async_op=True)``; drained by ``close_command``. - # Guarded by ``_async_handles_lock`` so concurrent cursors on the - # same connection don't race on submit / close / close-session. + # ``execute_command(async_op=True)``; drained by ``close_command`` + # / ``close_session``. Guarded by ``_async_handles_lock`` so + # concurrent cursors on the same connection don't race on submit / + # close / close-session. + # + # This is a KEEP-ALIVE registry, not a state/result lookup: the + # submitting ``ExecutedAsyncStatement``'s ``Drop`` fires a + # fire-and-forget ``close_statement``, which would kill the + # still-running async query the moment the handle is dropped. We + # retain it (and its parent ``Statement``) here so the live query + # survives until an explicit close. ``get_query_state`` / + # ``get_execution_result`` do NOT consult this map — they + # re-attach to the statement by id (the server is the source of + # truth for async state), so they work even cross-process. self._async_handles: Dict[str, Any] = {} # Parent ``Statement`` objects kept alive alongside async handles. # On the kernel, ``Statement.close()`` flips the validity flag on @@ -215,12 +269,6 @@ def __init__( # ``close_command`` / ``close_session`` after the async handle # has finished its work. self._async_statements: Dict[str, Any] = {} - # CommandId.guids of async commands that have already been - # closed (via ``close_command`` or ``close_session``). Lets - # ``get_query_state`` report ``CLOSED`` for them rather than - # the SUCCEEDED fall-through used for the never-tracked sync - # path. Same lock as ``_async_handles``. - self._closed_commands: Set[str] = set() self._async_handles_lock = threading.RLock() # Sync-execute cancellers keyed by ``id(cursor)``. A blocking # ``execute()`` sets ``cursor.active_command_id`` only AFTER it @@ -354,8 +402,6 @@ def close_session(self, session_id: SessionId) -> None: tracked_stmts = list(self._async_statements.items()) self._async_handles.clear() self._async_statements.clear() - for guid, _ in tracked: - self._closed_commands.add(guid) for _, handle in tracked: # Per-handle close errors are non-fatal — PEP 249 # discourages raising from session close — so log and @@ -487,6 +533,27 @@ def execute_command( # produced to reap it. close_stmt = False except Exception as exc: + # Failed sync execute: publish the server-issued + # statement id (observed mid-execute via the canceller's + # inflight slot, still registered here — the finally pops + # it) so the cursor's query_id reflects the FAILED query, + # matching the Thrift backend which sets active_command_id + # on every execute regardless of outcome. statement_id() + # is None for a pre-id failure (transport error on the + # initial POST) — then leave active_command_id untouched. + # Best-effort; never mask the original failure. + try: + with self._sync_cancellers_lock: + canceller = self._sync_cancellers.get(id(cursor)) + stmt_id = ( + canceller.statement_id() if canceller is not None else None + ) + if stmt_id: + cursor.active_command_id = CommandId.from_sea_statement_id( + stmt_id + ) + except Exception: + pass raise _wrap_kernel_exception("execute_command", exc) from exc finally: with self._sync_cancellers_lock: @@ -502,7 +569,21 @@ def execute_command( pass command_id = CommandId.from_sea_statement_id(executed.statement_id) - cursor.active_command_id = command_id + # Surface the affected-row count for DML (INSERT/UPDATE/DELETE/ + # MERGE) as ``cursor.rowcount`` instead of the hardcoded ``-1``. + # ``num_modified_rows`` is ``None`` for SELECT (and warehouses + # that don't report it) → leave ``rowcount`` at its ``-1`` + # default. ``getattr`` guards against an older kernel wheel that + # predates the pyo3 getter. NB the Thrift backend also hardcodes + # ``-1`` here, so this makes the kernel path *exceed* Thrift. + try: + modified = getattr(executed, "num_modified_rows", None) + if callable(modified): + modified = modified() + except Exception: + modified = None + if modified is not None: + cursor.rowcount = modified # ``KernelResultSet.__init__`` calls ``arrow_schema()`` which # can itself raise ``KernelError`` (or, in principle, a PyO3 # native exception) — wrap the construction so callers see a @@ -571,10 +652,10 @@ def close_command(self, command_id: CommandId) -> None: with self._async_handles_lock: handle = self._async_handles.pop(command_id.guid, None) stmt = self._async_statements.pop(command_id.guid, None) - if handle is not None: - # Record the close so ``get_query_state`` can report - # ``CLOSED`` (not ``SUCCEEDED``) for this command. - self._closed_commands.add(command_id.guid) + # Closing the handle below fires the server-side CloseStatement. + # A subsequent ``get_query_state`` re-attaches by id and reads + # ``CLOSED`` straight from the server — no connector-side + # closed-state bookkeeping needed. if handle is None: logger.debug("close_command: no tracked handle for %s", command_id) # Still drop the parent Statement if somehow tracked without @@ -600,22 +681,40 @@ def close_command(self, command_id: CommandId) -> None: pass def get_query_state(self, command_id: CommandId) -> CommandState: - with self._async_handles_lock: - handle = self._async_handles.get(command_id.guid) - already_closed = command_id.guid in self._closed_commands - if handle is None: - if already_closed: - # We tracked this async handle and have since closed - # it; the command is no longer queryable on the - # server but the connector still has the id. - return CommandState.CLOSED - # No tracked async handle and never closed: execute_command - # ran sync and the result was materialised before - # returning. Terminal by construction. - return CommandState.SUCCEEDED + # Server is the source of truth for async command state. Re-attach + # to the statement by its id and read the state the server reports + # — no connector-side state to drift. SEA keys GetStatementStatus + # purely on the id, so a statement the connector no longer holds a + # handle for (or never held — a different process) is still + # queryable. CLOSED comes straight from the server: after a + # statement is closed (DELETE) the server still returns 200 + # state=CLOSED until the result TTL elapses. + if self._kernel_session is None: + raise InterfaceError("get_query_state requires an open session.") try: + handle = self._kernel_session.attach_async_statement(command_id.guid) state, failure = handle.status() except Exception as exc: + if _is_not_found(exc): + # The server doesn't recognise the id (404). Two cases, + # disambiguated by whether we ever tracked it as async: + # * still in _async_handles -> a live async command the + # server lost? Shouldn't happen; fall through to the + # not-tracked answer below. + # * not (or no longer) tracked async -> either a sync + # command (its id was never a standalone server + # statement; terminal-by-construction since the result + # was materialised before execute_command returned) or + # an async command that was closed and has since aged + # out of the server's result TTL. + # A closed async command reports CLOSED via the 200 path + # above for as long as it's queryable; once it 404s it's + # genuinely gone. SUCCEEDED is the truthful terminal answer + # for the sync case and a harmless one for an aged-out + # closed command (a client polling a closed command saw + # CLOSED while it was live). Matches the prior + # sync-fall-through behaviour. + return CommandState.SUCCEEDED raise _wrap_kernel_exception("get_query_state", exc) from exc if state == "Failed" and failure is not None: # Surface server-reported failure as a database error so @@ -639,47 +738,26 @@ def get_execution_result( command_id: CommandId, cursor: "Cursor", ) -> "ResultSet": - with self._async_handles_lock: - async_exec = self._async_handles.get(command_id.guid) - if async_exec is None: - raise ProgrammingError( - "get_execution_result called for an unknown command_id; " - "the kernel backend only tracks async-submitted statements." - ) + # Re-attach to the statement by id and await its result. SEA keys + # GetStatementResult on the id, so this works whether or not the + # connector still holds the submitting handle — and it's + # inherently re-callable (each call attaches a fresh handle and + # re-materialises the result stream), matching the Thrift backend + # where the operation handle stays re-fetchable until an explicit + # close. No connector-side handle lookup, so no + # ``unknown command_id`` failure on a second call. + # + # ``attach_async_statement`` issues a GetStatementStatus to seed + # the handle; a 404 (unknown / aged-out id) surfaces as a + # NotFound KernelError mapped to ``ProgrammingError`` below via + # ``_wrap_kernel_exception``. + if self._kernel_session is None: + raise InterfaceError("get_execution_result requires an open session.") try: - stream = async_exec.await_result() + handle = self._kernel_session.attach_async_statement(command_id.guid) + stream = handle.await_result() except Exception as exc: raise _wrap_kernel_exception("get_execution_result", exc) from exc - # The async-exec handle's role ends once it has produced the - # ``ResultStream`` — keeping it around (and tracked in - # ``_async_handles``) would leak the server-side - # ``ExecutedAsyncStatement`` until ``close_session`` swept it - # up, since ``KernelResultSet.close`` only closes the stream - # it wraps. Drop tracking and fire-and-forget the close. - with self._async_handles_lock: - self._async_handles.pop(command_id.guid, None) - stmt = self._async_statements.pop(command_id.guid, None) - self._closed_commands.add(command_id.guid) - try: - async_exec.close() - except Exception as exc: - logger.warning( - "Error closing async_exec after await_result for %s: %s", - command_id, - exc, - ) - # The parent Statement is no longer needed once the async handle - # has produced its ResultStream. Close to release server-side - # tracking; matches the sync path's eager Statement close. - if stmt is not None: - try: - stmt.close() - except Exception as exc: - logger.warning( - "Error closing async statement after await_result for %s: %s", - command_id, - exc, - ) # ``KernelResultSet.__init__`` calls ``arrow_schema()`` which # can raise — map that to PEP 249 too. try: @@ -697,7 +775,17 @@ def _make_result_set( ) -> "ResultSet": """Build a ``KernelResultSet`` from any kernel handle. Used by sync execute, ``get_execution_result``, and all metadata - paths to keep construction in one place.""" + paths to keep construction in one place. + + Sets ``cursor.active_command_id`` here so every result-producing + path — sync execute, async fetch, AND metadata — leaves the + cursor pointing at the command that produced the current result + set. This matches the Thrift backend, which sets it + unconditionally in ``_handle_execute_response``. Without it, + ``cursor.query_id`` / ``get_query_state`` would stay pinned to a + prior query after a metadata call (the metadata methods mint a + synthetic command id but previously never published it).""" + cursor.active_command_id = command_id return KernelResultSet( connection=cursor.connection, backend=self, @@ -746,8 +834,8 @@ def get_schemas( raise InterfaceError("get_schemas requires an open session.") try: stream = self._kernel_session.metadata().list_schemas( - catalog=catalog_name, - schema_pattern=schema_name, + catalog=_catalog_or_none(catalog_name), + schema_pattern=_none_if_blank(schema_name), ) return self._make_result_set(stream, cursor, self._synthetic_command_id()) except Exception as exc: @@ -767,45 +855,18 @@ def get_tables( if self._kernel_session is None: raise InterfaceError("get_tables requires an open session.") try: + # ``table_types`` is filtered kernel-side (the kernel applies + # it to the reshaped result, case-insensitively as of the + # batch-3 kernel change), so we forward it and let the kernel + # do the work — no connector-side drain + refilter. Passing it + # through preserves streaming for large schemas. stream = self._kernel_session.metadata().list_tables( - catalog=catalog_name, - schema_pattern=schema_name, - table_pattern=table_name, - table_types=table_types, - ) - if not table_types: - return self._make_result_set( - stream, cursor, self._synthetic_command_id() - ) - # The kernel today returns the unfiltered ``SHOW TABLES`` - # shape regardless of ``table_types``. Drain to a single - # Arrow table and apply the same client-side filter the - # native SEA backend uses. The filter is **case-sensitive** - # — matches the SEA backend's documented behaviour, and - # mirrors how the warehouse reports the values - # (``TABLE`` / ``VIEW`` / ``SYSTEM_TABLE`` — uppercase). - # Look the column up by name rather than positional index - # so a future kernel reshape of ``SHOW TABLES`` doesn't - # silently filter the wrong column. - from databricks.sql.backend.sea.utils.filters import ResultSetFilter - - full_table = _drain_kernel_handle(stream) - if "TABLE_TYPE" not in full_table.schema.names: - raise OperationalError( - "kernel get_tables result is missing a TABLE_TYPE " - f"column; got {full_table.schema.names!r}" - ) - filtered_table = ResultSetFilter._filter_arrow_table( - full_table, - column_name="TABLE_TYPE", - allowed_values=table_types, - case_sensitive=True, - ) - return self._make_result_set( - _StaticArrowHandle(filtered_table), - cursor, - self._synthetic_command_id(), + catalog=_catalog_or_none(catalog_name), + schema_pattern=_none_if_blank(schema_name), + table_pattern=_none_if_blank(table_name), + table_types=table_types if table_types else None, ) + return self._make_result_set(stream, cursor, self._synthetic_command_id()) except Exception as exc: raise _wrap_kernel_exception("get_tables", exc) from exc @@ -830,10 +891,10 @@ def get_columns( # Thrift backend's `getColumns(null, …)` behaviour from # the user's perspective. stream = self._kernel_session.metadata().list_columns( - catalog=catalog_name, - schema_pattern=schema_name, - table_pattern=table_name, - column_pattern=column_name, + catalog=_catalog_or_none(catalog_name), + schema_pattern=_none_if_blank(schema_name), + table_pattern=_none_if_blank(table_name), + column_pattern=_none_if_blank(column_name), ) return self._make_result_set(stream, cursor, self._synthetic_command_id()) except Exception as exc: @@ -1006,55 +1067,3 @@ def _read_pem_bytes(path: str, label: str) -> bytes: "kernel TLS config." ) return data - - -def _drain_kernel_handle(handle: Any) -> Any: - """Drain a kernel ResultStream / ExecutedStatement into a single - ``pyarrow.Table``. Used by ``get_tables`` to apply a client-side - ``table_types`` filter on a metadata result; cheap because - metadata streams are small.""" - import pyarrow - - schema = handle.arrow_schema() - batches = [] - while True: - batch = handle.fetch_next_batch() - if batch is None: - break - if batch.num_rows > 0: - batches.append(batch) - try: - handle.close() - except Exception: - # Non-fatal — the surrounding ``get_tables`` call has already - # captured the result data, and the handle's server-side - # state will be reaped by the kernel's Drop impl. - pass - return pyarrow.Table.from_batches(batches, schema=schema) - - -class _StaticArrowHandle: - """Duck-typed kernel handle that replays a pre-built - ``pyarrow.Table`` through ``arrow_schema()`` / - ``fetch_next_batch()`` / ``close()``. Used to wrap a - post-processed table (e.g., the ``table_types``-filtered output - of ``get_tables``) so it flows back through the normal - ``KernelResultSet`` path.""" - - def __init__(self, table: Any) -> None: - self._schema = table.schema - self._batches = list(table.to_batches()) - self._idx = 0 - - def arrow_schema(self) -> Any: - return self._schema - - def fetch_next_batch(self) -> Optional[Any]: - if self._idx >= len(self._batches): - return None - batch = self._batches[self._idx] - self._idx += 1 - return batch - - def close(self) -> None: - self._batches = [] diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index e66dd897c..7b94fc98d 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1590,10 +1590,9 @@ def columns( Names can contain % wildcards. - Note: on ``use_kernel=True``, ``catalog_name`` is required — - the kernel's underlying ``SHOW COLUMNS`` cannot span catalogs. - Passing ``catalog_name=None`` raises ``ProgrammingError``. The - Thrift and native SEA backends accept ``catalog_name=None``. + ``catalog_name=None`` is accepted on all backends and matches + columns across every catalog (the kernel issues ``SHOW COLUMNS`` + over all catalogs). :returns self """ diff --git a/tests/e2e/test_kernel_backend.py b/tests/e2e/test_kernel_backend.py index f25c60630..4c822caa8 100644 --- a/tests/e2e/test_kernel_backend.py +++ b/tests/e2e/test_kernel_backend.py @@ -21,9 +21,12 @@ from __future__ import annotations +from uuid import uuid4 + import pytest import databricks.sql as sql +from databricks.sql.backend.types import CommandState from databricks.sql.exc import ( DatabaseError, NotSupportedError, @@ -252,6 +255,165 @@ def test_metadata_columns(conn): assert len(rows) > 0 +# ── Metadata filter normalization (batch 3) ─────────────────────── + + +def test_schemas_with_empty_string_filter_matches_all(conn): + """An empty-string schema pattern normalizes to match-all rather + than raising ``ProgrammingError`` (kernel rejects ``""``) — locks + ``_none_if_blank`` on the pattern args.""" + with conn.cursor() as cur: + cur.schemas(catalog_name="main", schema_name="") + rows = cur.fetchall() + assert len(rows) > 0 + + +def test_tables_table_types_filter_is_case_insensitive(conn): + """Lowercase ``table_types=['view']`` / uppercase ``['TABLE']`` + each match the right object regardless of case — locks the + kernel-side case-insensitive ``table_types`` match (batch-3 kernel + B2) end-to-end plus the connector drain removal (the filter now + runs kernel-side, not client-side). + + Self-contained: creates a table + a view over it in the session's + default (writable) schema, scopes the lookup to a unique name + prefix, and drops both afterward — no dependency on which + workspace schemas happen to contain views.""" + sfx = str(uuid4()).replace("-", "_") + tbl = f"dbsql_kernel_tt_t_{sfx}" + vw = f"dbsql_kernel_tt_v_{sfx}" + name_pat = f"dbsql_kernel_tt_%_{sfx}" + with conn.cursor() as cur: + cur.execute("SELECT current_catalog(), current_schema()") + cat, sch = cur.fetchall()[0] + try: + cur.execute(f"CREATE TABLE {tbl} (n INT)") + cur.execute(f"CREATE VIEW {vw} AS SELECT * FROM {tbl}") + + def _names_and_types(): + rows = cur.fetchall() + cols = [d[0] for d in cur.description] + ni, ti = cols.index("TABLE_NAME"), cols.index("TABLE_TYPE") + return {(r[ni], r[ti]) for r in rows} + + # Lowercase 'view' must match the VIEW (and only it). + cur.tables( + catalog_name=cat, + schema_name=sch, + table_name=name_pat, + table_types=["view"], + ) + assert _names_and_types() == {(vw, "VIEW")} + + # Uppercase 'TABLE' must match the TABLE (and only it). + cur.tables( + catalog_name=cat, + schema_name=sch, + table_name=name_pat, + table_types=["TABLE"], + ) + assert _names_and_types() == {(tbl, "TABLE")} + finally: + cur.execute(f"DROP VIEW IF EXISTS {vw}") + cur.execute(f"DROP TABLE IF EXISTS {tbl}") + + +# ── Cursor-state tracking (batch 3) ─────────────────────────────── + + +def test_metadata_call_publishes_active_command_id(conn): + """A metadata call leaves the cursor pointing at the command that + produced the current result set (Thrift parity) — ``query_id`` is + populated rather than stale/None after ``catalogs()``.""" + with conn.cursor() as cur: + cur.catalogs() + cur.fetchall() + assert cur.active_command_id is not None + assert cur.query_id is not None + + +def test_dml_rowcount_wiring_does_not_break_dml(conn): + """The ``num_modified_rows`` → ``cursor.rowcount`` wiring must not + break DML execution, and ``rowcount`` is a well-formed int. + + The affected-row count itself is only surfaced when the warehouse + reports ``num_modified_rows`` (absent on some warehouses, including + parts of dogfood — then ``rowcount`` stays at its ``-1`` default, + matching the Thrift backend). The positive-count mapping is locked + by the unit test; here we assert the path runs end-to-end and the + rows really landed. Self-contained in the writable default schema.""" + sfx = str(uuid4()).replace("-", "_") + tbl = f"dbsql_kernel_rc_{sfx}" + with conn.cursor() as cur: + try: + cur.execute(f"CREATE TABLE {tbl} (n INT)") + cur.execute(f"INSERT INTO {tbl} VALUES (1), (2), (3)") + # rowcount is a real int (>= -1); never a MagicMock / None / + # crash from the getattr wiring. + assert isinstance(cur.rowcount, int) + assert ( + cur.rowcount == 3 or cur.rowcount == -1 + ), f"unexpected rowcount {cur.rowcount!r}" + # The INSERT genuinely modified the table. + cur.execute(f"SELECT COUNT(*) FROM {tbl}") + assert cur.fetchall()[0][0] == 3 + finally: + cur.execute(f"DROP TABLE IF EXISTS {tbl}") + + +# ── Async execution: state + result come from the server (attach-by-id) ── + + +def test_async_execute_polls_and_fetches_result(conn): + """The full async CUJ: ``execute_async`` → poll + ``get_query_state`` → ``get_async_execution_result``. State and + result are read from the server by re-attaching to the statement + id (no connector-side state).""" + with conn.cursor() as cur: + cur.execute_async("SELECT 7 AS n") + cur.get_async_execution_result() # polls to terminal, fetches + rows = cur.fetchall() + assert rows[0][0] == 7 + assert cur.get_query_state() in ( + CommandState.SUCCEEDED, + CommandState.CLOSED, + ) + + +def test_async_get_execution_result_is_re_callable(conn): + """``get_async_execution_result`` re-attaches by id on each call, + so fetching the same async command twice both succeed — the + connector never relied on a one-shot retained handle (Thrift-parity + re-fetch).""" + with conn.cursor() as cur: + cur.execute_async("SELECT 11 AS n") + cur.get_async_execution_result() + first = cur.fetchall() + # Second fetch of the same command must also succeed. + cur.get_async_execution_result() + second = cur.fetchall() + assert first[0][0] == 11 + assert second[0][0] == 11 + + +def test_async_result_resumable_from_a_fresh_cursor(conn): + """The async command id is the only thing needed to fetch the + result — a *different* cursor that never submitted it can adopt the + id and retrieve the result, proving state/results come from the + server (attach-by-id), not connector-side per-cursor tracking.""" + with conn.cursor() as submitter: + submitter.execute_async("SELECT 13 AS n") + command_id = submitter.active_command_id + assert command_id is not None + + # A brand-new cursor adopts the id and fetches. + with conn.cursor() as resumer: + resumer.active_command_id = command_id + resumer.get_async_execution_result() + rows = resumer.fetchall() + assert rows[0][0] == 13 + + # ── Session configuration ───────────────────────────────────────── diff --git a/tests/unit/test_kernel_client.py b/tests/unit/test_kernel_client.py index 42ac36197..44ed42781 100644 --- a/tests/unit/test_kernel_client.py +++ b/tests/unit/test_kernel_client.py @@ -273,6 +273,14 @@ def test_no_open_session_guards_raise_interface_error(): **kwargs, ) + # The async-state methods attach to the kernel session by id; they + # must also guard a closed/absent session before the kernel call. + cid = CommandId.from_sea_statement_id("any-id") + with pytest.raises(InterfaceError): + c.get_query_state(cid) + with pytest.raises(InterfaceError): + c.get_execution_result(cid, cursor=cursor) + def test_open_session_rejects_double_open(monkeypatch): """Two ``open_session`` calls on the same client must fail — @@ -692,24 +700,85 @@ def test_close_command_tolerant_when_handle_missing(): c.close_command(fake_command_id) # must not raise -def test_get_query_state_returns_succeeded_when_handle_missing(): - """Sync-execute paths never register an async handle; by the - time ``get_query_state`` could be called the command is - terminal-by-construction. The client returns SUCCEEDED so the - cursor's polling loop terminates cleanly.""" +def _attach_returns(c, *, status=None, status_error=None, await_result=None): + """Wire ``_kernel_session.attach_async_statement`` to return a fake + handle. ``status`` is a ``(state, failure)`` tuple for ``handle.status()``; + ``status_error`` makes ``attach`` itself raise (e.g. NotFound); + ``await_result`` sets ``handle.await_result()`` return value.""" + c._kernel_session = MagicMock() + if status_error is not None: + c._kernel_session.attach_async_statement.side_effect = status_error + return None + handle = MagicMock() + if status is not None: + handle.status.return_value = status + if await_result is not None: + handle.await_result.return_value = await_result + c._kernel_session.attach_async_statement.return_value = handle + return handle + + +def test_get_query_state_returns_succeeded_when_server_404s(): + """An id the server doesn't recognise (sync command whose id was + never a standalone server statement, or an async command closed and + aged out of the result TTL) surfaces as a NotFound KernelError from + ``attach``; the client maps that to SUCCEEDED so the cursor's + polling loop terminates cleanly.""" c = _make_client() - fake_command_id = CommandId.from_sea_statement_id("sync-only") - assert c.get_query_state(fake_command_id) == CommandState.SUCCEEDED + _attach_returns(c, status_error=_FakeKernelError(code="NotFound")) + cid = CommandId.from_sea_statement_id("sync-only") + assert c.get_query_state(cid) == CommandState.SUCCEEDED + + +def test_get_query_state_returns_closed_from_server(): + """A closed-but-not-yet-GC'd async command: the server still answers + GetStatementStatus with state=CLOSED (200), which flows through the + state map to ``CommandState.CLOSED`` — no connector-side + closed-state bookkeeping.""" + c = _make_client() + _attach_returns(c, status=("Closed", None)) + cid = CommandId.from_sea_statement_id("closed-async") + assert c.get_query_state(cid) == CommandState.CLOSED + + +def test_get_query_state_propagates_non_not_found_error(): + """A transient/other error from ``attach`` (NOT NotFound) must not be + silently swallowed as a terminal state — it propagates as a mapped + PEP 249 exception so the caller can retry / surface it.""" + c = _make_client() + _attach_returns(c, status_error=_FakeKernelError(code="Unavailable")) + cid = CommandId.from_sea_statement_id("flaky") + with pytest.raises(DatabaseError): + c.get_query_state(cid) + + +def test_get_execution_result_attaches_by_id(): + """``get_execution_result`` re-attaches to the statement by id and + awaits its result — no connector-side handle lookup.""" + c = _make_client() + fake_stream = MagicMock() + fake_stream.arrow_schema.return_value = pa.schema([("n", pa.int64())]) + handle = _attach_returns(c, await_result=fake_stream) + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + cid = CommandId.from_sea_statement_id("async-1") + + rs = c.get_execution_result(cid, cursor=cursor) + + assert rs is not None + c._kernel_session.attach_async_statement.assert_called_with("async-1") + handle.await_result.assert_called_once_with() -def test_get_execution_result_raises_for_unknown_command_id(): - """The kernel backend only tracks async-submitted statements; - a ``get_execution_result`` call for an unknown id is a - programming error.""" +def test_get_execution_result_maps_not_found_to_programming_error(): + """An unknown / aged-out id surfaces the kernel's NotFound as a + mapped PEP 249 exception rather than a raw error.""" c = _make_client() - fake_command_id = CommandId.from_sea_statement_id("unknown") - with pytest.raises(ProgrammingError, match="unknown command_id"): - c.get_execution_result(fake_command_id, cursor=MagicMock()) + _attach_returns(c, status_error=_FakeKernelError(code="NotFound")) + cid = CommandId.from_sea_statement_id("gone") + with pytest.raises(DatabaseError): + c.get_execution_result(cid, cursor=MagicMock()) def test_cancel_command_reraises_kernel_error(): @@ -737,13 +806,10 @@ def test_close_command_reraises_kernel_error(): def test_get_query_state_raises_on_failed_state_with_failure(): c = _make_client() - fake_handle = MagicMock() - fake_handle.status.return_value = ( - "Failed", - _FakeKernelError(code="SqlError", message="bad"), + _attach_returns( + c, status=("Failed", _FakeKernelError(code="SqlError", message="bad")) ) cid = CommandId.from_sea_statement_id("abc") - c._async_handles[cid.guid] = fake_handle with pytest.raises(DatabaseError, match="bad"): c.get_query_state(cid) @@ -756,12 +822,10 @@ def test_get_query_state_handles_non_baseexception_failure(): ``TypeError: exception causes must derive from BaseException``; the wrap helper deals with it.""" c = _make_client() - fake_handle = MagicMock() # ``failure`` is a plain dict (not BaseException) — simulates a # kernel binding that exposes the failure as a structured value. - fake_handle.status.return_value = ("Failed", {"code": "Internal", "msg": "weird"}) + _attach_returns(c, status=("Failed", {"code": "Internal", "msg": "weird"})) cid = CommandId.from_sea_statement_id("xyz") - c._async_handles[cid.guid] = fake_handle # Must surface as a PEP 249 exception (OperationalError via the # wrap helper's fallback path), not TypeError. with pytest.raises(OperationalError): @@ -770,10 +834,8 @@ def test_get_query_state_handles_non_baseexception_failure(): def test_get_query_state_returns_state_when_no_failure(): c = _make_client() - fake_handle = MagicMock() - fake_handle.status.return_value = ("Running", None) + _attach_returns(c, status=("Running", None)) cid = CommandId.from_sea_statement_id("abc") - c._async_handles[cid.guid] = fake_handle assert c.get_query_state(cid) == CommandState.RUNNING @@ -817,36 +879,45 @@ def test_close_session_clears_async_handles_even_if_close_fails(): assert bad.close.called -def test_close_session_marks_swept_handles_as_closed(): - """Close-session pre-populates ``_closed_commands`` for every - swept async handle so a subsequent ``get_query_state`` reports - ``CLOSED`` instead of falling through to the SUCCEEDED - sync-default.""" +def test_close_session_closes_and_drops_swept_handles(): + """Close-session closes every tracked async handle (firing its + server-side CloseStatement) and drops it from the keep-alive map. + There is no connector-side closed-state bookkeeping — a subsequent + ``get_query_state`` re-attaches by id and reads CLOSED from the + server.""" c = _make_client() handle = MagicMock() cid = CommandId.from_sea_statement_id("xyz") c._async_handles[cid.guid] = handle c._kernel_session = MagicMock() c.close_session(MagicMock()) - assert cid.guid in c._closed_commands + assert handle.close.called + assert c._async_handles == {} # --------------------------------------------------------------------------- -# CLOSED command-state for previously-tracked async handles (m3) +# CLOSED command-state comes from the server (re-attach by id) # --------------------------------------------------------------------------- def test_get_query_state_returns_closed_after_close_command(): - """After ``close_command`` on a tracked async handle, the - subsequent ``get_query_state`` lookup must report ``CLOSED``, - not fall through to the SUCCEEDED sync-default — the command - was tracked then closed; SUCCEEDED would lie about its history.""" + """After ``close_command`` fires the server-side CloseStatement, a + subsequent ``get_query_state`` re-attaches by id and the server + reports CLOSED (200 state=CLOSED until the result TTL elapses). No + connector-side closed-state tracking — the server is the source of + truth.""" c = _make_client() + c._kernel_session = MagicMock() handle = MagicMock() cid = CommandId.from_sea_statement_id("async-1") c._async_handles[cid.guid] = handle c.close_command(cid) assert handle.close.called + + # Re-attach now reports CLOSED from the server. + attached = MagicMock() + attached.status.return_value = ("Closed", None) + c._kernel_session.attach_async_statement.return_value = attached assert c.get_query_state(cid) == CommandState.CLOSED @@ -926,72 +997,49 @@ def test_kernel_error_during_result_set_construction_is_mapped(): # --------------------------------------------------------------------------- -# Async leak in get_execution_result (M1) +# get_execution_result is re-callable via attach-by-id # --------------------------------------------------------------------------- -def test_get_execution_result_closes_async_exec_and_drops_tracking(): - """The ``ExecutedAsyncStatement`` handle's role ends once it - produces a ``ResultStream`` via ``await_result()``. The client - must close the async_exec and drop the tracking entry there — - otherwise ``KernelResultSet.close()`` (which only closes the - stream) leaves the executed handle leaked server-side until - ``close_session`` sweeps.""" +def test_get_execution_result_is_re_callable(): + """``get_execution_result`` re-attaches by id on every call, so a + second fetch for the same async command succeeds (Thrift-parity + re-fetch). Each call attaches a fresh handle and awaits its result; + neither raises, and the connector never depended on a retained + handle. The kernel's ``await_result()`` is idempotent server-side.""" c = _make_client() c._kernel_session = MagicMock() - async_exec = MagicMock() fake_stream = MagicMock() fake_stream.arrow_schema.return_value = pa.schema([("n", pa.int64())]) - async_exec.await_result.return_value = fake_stream - cid = CommandId.from_sea_statement_id("async-leak-test") - c._async_handles[cid.guid] = async_exec + handle = MagicMock() + handle.await_result.return_value = fake_stream + c._kernel_session.attach_async_statement.return_value = handle + cid = CommandId.from_sea_statement_id("async-recall-twice") cursor = MagicMock() cursor.arraysize = 100 cursor.buffer_size_bytes = 1024 - c.get_execution_result(cid, cursor=cursor) + rs1 = c.get_execution_result(cid, cursor=cursor) + rs2 = c.get_execution_result(cid, cursor=cursor) - # async_exec must be closed and dropped from tracking; the - # closed-commands set records it. - assert async_exec.close.called - assert cid.guid not in c._async_handles - assert cid.guid in c._closed_commands - - -def test_get_execution_result_does_not_raise_on_async_exec_close_failure(): - """A failure to close the async_exec is non-fatal — the result - stream has already been returned by ``await_result()`` and the - kernel's Drop will reap server-side state.""" - c = _make_client() - c._kernel_session = MagicMock() - async_exec = MagicMock() - fake_stream = MagicMock() - fake_stream.arrow_schema.return_value = pa.schema([("n", pa.int64())]) - async_exec.await_result.return_value = fake_stream - async_exec.close.side_effect = _FakeKernelError(code="Unavailable") - cid = CommandId.from_sea_statement_id("async-close-fail") - c._async_handles[cid.guid] = async_exec - cursor = MagicMock() - cursor.arraysize = 100 - cursor.buffer_size_bytes = 1024 - - # Must not raise. - rs = c.get_execution_result(cid, cursor=cursor) - assert rs is not None - assert cid.guid not in c._async_handles + assert rs1 is not None and rs2 is not None + # Two calls -> two attaches -> two await_results. No reliance on a + # connector-tracked handle. + assert c._kernel_session.attach_async_statement.call_count == 2 + assert handle.await_result.call_count == 2 # --------------------------------------------------------------------------- -# get_tables table_types client-side filter (m2) +# get_tables — table_types is filtered kernel-side (no connector drain) # --------------------------------------------------------------------------- def _make_tables_stream() -> MagicMock: """Build a fake stream that mimics the kernel's ``list_tables`` - output shape (5 cols ending in TABLE_TYPE at index 5 — the - connector matches what SEA produces, which has 5 metadata cols - before TABLE_TYPE). Returns a fixed table with mixed table types - so the filter has something to discriminate.""" + output shape. The kernel applies the ``table_types`` filter + itself, so the connector now forwards ``table_types`` and returns + this stream unchanged — these tests mock the kernel filter away + and only assert the forwarded args + pass-through behaviour.""" schema = pa.schema( [ ("TABLE_CAT", pa.string()), @@ -1021,12 +1069,15 @@ def _make_tables_stream() -> MagicMock: return stream -def test_get_tables_with_table_types_filters_rows(): +def test_get_tables_forwards_table_types_to_kernel(): + """``table_types`` is forwarded verbatim to the kernel's + ``list_tables`` (which filters case-insensitively) — the + connector no longer drains + refilters client-side. The stream + flows back through the normal ``KernelResultSet`` path unchanged.""" c = _make_client() c._kernel_session = MagicMock() - c._kernel_session.metadata.return_value.list_tables.return_value = ( - _make_tables_stream() - ) + list_tables = c._kernel_session.metadata.return_value.list_tables + list_tables.return_value = _make_tables_stream() cursor = MagicMock() cursor.arraysize = 100 cursor.buffer_size_bytes = 1024 @@ -1036,21 +1087,28 @@ def test_get_tables_with_table_types_filters_rows(): max_rows=10, max_bytes=1, cursor=cursor, - table_types=["TABLE"], + table_types=["view"], ) + + list_tables.assert_called_once_with( + catalog=None, + schema_pattern=None, + table_pattern=None, + table_types=["view"], + ) + # Stream is returned as-is — no connector-side row filtering. The + # mock kernel doesn't filter, so all three rows pass through. table = rs.fetchall_arrow() - assert table.num_rows == 2 - assert set(table.column("TABLE_TYPE").to_pylist()) == {"TABLE"} + assert table.num_rows == 3 -def test_get_tables_without_table_types_returns_full_stream(): - """No filter → kernel result flows through unchanged via the - normal ``KernelResultSet`` path (no drain-and-rewrap).""" +def test_get_tables_without_table_types_passes_none(): + """No filter → ``table_types=None`` forwarded; stream flows + through unchanged via the normal ``KernelResultSet`` path.""" c = _make_client() c._kernel_session = MagicMock() - c._kernel_session.metadata.return_value.list_tables.return_value = ( - _make_tables_stream() - ) + list_tables = c._kernel_session.metadata.return_value.list_tables + list_tables.return_value = _make_tables_stream() cursor = MagicMock() cursor.arraysize = 100 cursor.buffer_size_bytes = 1024 @@ -1062,10 +1120,313 @@ def test_get_tables_without_table_types_returns_full_stream(): cursor=cursor, table_types=None, ) + + list_tables.assert_called_once_with( + catalog=None, + schema_pattern=None, + table_pattern=None, + table_types=None, + ) table = rs.fetchall_arrow() assert table.num_rows == 3 +# --------------------------------------------------------------------------- +# Cursor-state tracking (T7) — active_command_id consistency +# --------------------------------------------------------------------------- + + +def _stream_with_schema() -> MagicMock: + """A minimal fake kernel handle whose ``arrow_schema()`` returns a + real schema so ``KernelResultSet.__init__`` succeeds.""" + stream = MagicMock() + stream.arrow_schema.return_value = pa.schema([("x", pa.int64())]) + return stream + + +def test_metadata_call_sets_active_command_id(): + """Metadata methods mint a synthetic command id and must publish + it on the cursor (Thrift sets ``active_command_id`` unconditionally + in ``_handle_execute_response``). Without this, ``cursor.query_id`` + would stay pinned to a prior query after a metadata browse.""" + c = _make_client() + c._kernel_session = MagicMock() + c._kernel_session.metadata.return_value.list_catalogs.return_value = ( + _stream_with_schema() + ) + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + cursor.active_command_id = None + + c.get_catalogs(session_id=MagicMock(), max_rows=1, max_bytes=1, cursor=cursor) + + assert cursor.active_command_id is not None + # The synthetic id is a UUID-shaped guid the cursor can attribute + # logs to (rather than a stale prior-query id). + assert cursor.active_command_id.guid + + +def test_sync_execute_sets_active_command_id(): + """A successful sync execute publishes the server statement id on + the cursor.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + cursor.active_command_id = None + + stmt = MagicMock() + stmt.execute.return_value = MagicMock( + statement_id="stmt-abc", + num_modified_rows=None, + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert cursor.active_command_id is not None + assert cursor.active_command_id.guid == "stmt-abc" + + +def test_failed_sync_execute_sets_active_command_id_from_canceller(): + """When execute() fails *after* the server assigned a statement id, + the canceller's inflight slot holds that id. The connector reads it + and publishes ``active_command_id`` before re-raising, so the cursor + can correlate the failed query (telemetry / log lookup).""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + cursor.active_command_id = None + + canceller = MagicMock() + canceller.statement_id.return_value = "failed-stmt-id" + stmt = MagicMock() + stmt.canceller.return_value = canceller + stmt.execute.side_effect = RuntimeError("boom after id assigned") + c._kernel_session.statement.return_value = stmt + + with pytest.raises(Exception): + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert cursor.active_command_id is not None + assert cursor.active_command_id.guid == "failed-stmt-id" + + +def test_failed_sync_execute_leaves_active_command_id_untouched_when_no_id(): + """A pre-id transport failure (canceller has no statement id yet) + must leave ``active_command_id`` untouched — there's nothing to + correlate, and clobbering it would lie about cursor state.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + sentinel = object() + cursor.active_command_id = sentinel + + canceller = MagicMock() + canceller.statement_id.return_value = None # no id observed yet + stmt = MagicMock() + stmt.canceller.return_value = canceller + stmt.execute.side_effect = RuntimeError("connect refused") + c._kernel_session.statement.return_value = stmt + + with pytest.raises(Exception): + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert cursor.active_command_id is sentinel + + +def test_sync_execute_forwards_num_modified_rows_to_rowcount(): + """DML reports a real ``cursor.rowcount`` from the kernel's + ``num_modified_rows`` instead of the hardcoded ``-1``.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + cursor.rowcount = -1 + + stmt = MagicMock() + stmt.execute.return_value = MagicMock( + statement_id="dml-1", + num_modified_rows=42, + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="INSERT INTO t VALUES (1)", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert cursor.rowcount == 42 + + +def test_sync_execute_leaves_rowcount_default_when_num_modified_rows_none(): + """SELECT (and warehouses that don't report it) → ``None`` leaves + ``rowcount`` at its ``-1`` default.""" + c = _make_client() + c._kernel_session = MagicMock() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + cursor.rowcount = -1 + + stmt = MagicMock() + stmt.execute.return_value = MagicMock( + statement_id="select-1", + num_modified_rows=None, + arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])), + ) + c._kernel_session.statement.return_value = stmt + + c.execute_command( + operation="SELECT 1", + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + lz4_compression=False, + cursor=cursor, + use_cloud_fetch=False, + parameters=[], + async_op=False, + enforce_embedded_schema_correctness=False, + ) + + assert cursor.rowcount == -1 + + +# --------------------------------------------------------------------------- +# Metadata filter normalization — wildcard catalog + empty-string patterns +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("wildcard", ["%", "*", "", " "]) +def test_get_columns_normalizes_wildcard_catalog_to_none(wildcard): + """``catalog_name`` of ``%``/``*``/blank → ``None`` (all-catalogs), + matching JDBC exact-or-all semantics and keeping the three metadata + methods symmetric.""" + c = _make_client() + c._kernel_session = MagicMock() + list_columns = c._kernel_session.metadata.return_value.list_columns + list_columns.return_value = _stream_with_schema() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + c.get_columns( + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + cursor=cursor, + catalog_name=wildcard, + schema_name="s", + table_name="t", + column_name="c", + ) + + list_columns.assert_called_once_with( + catalog=None, + schema_pattern="s", + table_pattern="t", + column_pattern="c", + ) + + +def test_get_schemas_normalizes_blank_pattern_to_none(): + """An empty-string schema pattern → ``None`` (match-all), mapping + the kernel's ``InvalidArgument``-on-``""`` to Thrift's effective + match-all. ``%``/``*`` stay as real LIKE wildcards on patterns.""" + c = _make_client() + c._kernel_session = MagicMock() + list_schemas = c._kernel_session.metadata.return_value.list_schemas + list_schemas.return_value = _stream_with_schema() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + c.get_schemas( + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + cursor=cursor, + catalog_name="main", + schema_name="", + ) + + list_schemas.assert_called_once_with(catalog="main", schema_pattern=None) + + +def test_get_schemas_keeps_wildcard_pattern(): + """A ``%`` schema pattern is a real LIKE wildcard — passed through, + NOT normalized to None.""" + c = _make_client() + c._kernel_session = MagicMock() + list_schemas = c._kernel_session.metadata.return_value.list_schemas + list_schemas.return_value = _stream_with_schema() + cursor = MagicMock() + cursor.arraysize = 100 + cursor.buffer_size_bytes = 1024 + + c.get_schemas( + session_id=MagicMock(), + max_rows=1, + max_bytes=1, + cursor=cursor, + catalog_name="main", + schema_name="prod_%", + ) + + list_schemas.assert_called_once_with(catalog="main", schema_pattern="prod_%") + + # --------------------------------------------------------------------------- # TLS translation: SSLOptions -> kernel Session tls_* kwargs. # ---------------------------------------------------------------------------