From 379fbea855891905ce6d9b29f8fe34ea8bb05848 Mon Sep 17 00:00:00 2001 From: msvargas Date: Mon, 15 Jun 2026 16:25:41 -0500 Subject: [PATCH 1/2] fix(tanstack-react-query): use skipToken instead of overriding enabled option Replace the enabled: streamsHaveSynced pattern with TanStack's skipToken to preserve the user's own enabled option. Problems fixed: - User's enabled option was silently overridden by PowerSync's streamsHaveSynced - usePowerSyncQueries final useMemo was missing streamsHaveSynced in deps, returning stale values - Race condition: change listeners attached watching [] tables while resolveTables was pending, losing first-sync writes Changes: - useQuery: Conditionally pass skipToken when streams haven't synced; suspense queries always get the real queryFn - useQueries: Same skipToken approach per query entry; add streamsHaveSynced to deps - usePowerSyncQueries: Add tablesInitialized ref to rescue data lost during the []->[tables] transition; add streamsHaveSynced to return memo deps --- .../src/hooks/usePowerSyncQueries.ts | 20 ++- .../src/hooks/useQueries.ts | 8 +- .../src/hooks/useQuery.ts | 9 +- .../tests/enabled.test.tsx | 100 ++++++++++++ .../tests/usePowerSyncQueries.test.tsx | 143 ++++++++++++++++++ 5 files changed, 271 insertions(+), 9 deletions(-) create mode 100644 packages/tanstack-react-query/tests/enabled.test.tsx create mode 100644 packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx diff --git a/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts b/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts index 9a4c7f877..c910b2358 100644 --- a/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts +++ b/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts @@ -1,6 +1,6 @@ import { type CompilableQuery, parseQuery } from '@powersync/common'; import { QuerySyncStreamOptions, useAllSyncStreamsHaveSynced, usePowerSync } from '@powersync/react'; -import { useEffect, useState, useCallback, useMemo } from 'react'; +import { useEffect, useState, useCallback, useMemo, useRef } from 'react'; import * as Tanstack from '@tanstack/react-query'; export type UsePowerSyncQueriesInput = { @@ -111,7 +111,17 @@ export function usePowerSyncQueries( })) ); + // Tracks the []->[tables] transition to rescue data lost during pending resolveTables. + const tablesInitialized = useRef([]); + + if (tablesInitialized.current.length !== parsedQueries.length) { + tablesInitialized.current = parsedQueries.map(() => false); + } + useEffect(() => { + // Re-arm rescue tracking when queries change. + tablesInitialized.current = parsedQueries.map(() => false); + const listeners = parsedQueries.map((pq, idx) => { if (pq.parseError || !pq.query) { return null; @@ -154,6 +164,12 @@ export function usePowerSyncQueries( const abort = new AbortController(); + // Rescue data lost while resolveTables was pending (listener watched [] tables). + if (tablesArr[idx]?.length > 0 && !tablesInitialized.current[idx]) { + tablesInitialized.current[idx] = true; + queryClient.invalidateQueries({ queryKey: pq.queryKey }); + } + powerSync.onChangeWithCallback( { onChange: () => { @@ -203,5 +219,5 @@ export function usePowerSyncQueries( }), streamsHaveSynced }; - }, [parsedQueries, errorsArr, tablesArr, powerSync]); + }, [parsedQueries, errorsArr, tablesArr, powerSync, streamsHaveSynced]); } diff --git a/packages/tanstack-react-query/src/hooks/useQueries.ts b/packages/tanstack-react-query/src/hooks/useQueries.ts index 4798e568e..52e210877 100644 --- a/packages/tanstack-react-query/src/hooks/useQueries.ts +++ b/packages/tanstack-react-query/src/hooks/useQueries.ts @@ -151,15 +151,15 @@ export function useQueries( return queriesInput.map((queryOptions, idx) => { const { query, parameters, ...rest } = queryOptions; const state = states[idx]; + const queryFn = streamsHaveSynced ? (query ? state.queryFn : rest.queryFn) : Tanstack.skipToken; return { ...rest, - queryFn: query ? state.queryFn : rest.queryFn, - queryKey: rest.queryKey, - enabled: streamsHaveSynced + queryFn, + queryKey: rest.queryKey }; }); - }, [queriesInput, states]); + }, [queriesInput, states, streamsHaveSynced]); return Tanstack.useQueries( { diff --git a/packages/tanstack-react-query/src/hooks/useQuery.ts b/packages/tanstack-react-query/src/hooks/useQuery.ts index ace5ff63e..e1b9a38ec 100644 --- a/packages/tanstack-react-query/src/hooks/useQuery.ts +++ b/packages/tanstack-react-query/src/hooks/useQuery.ts @@ -135,7 +135,7 @@ function useQueryCore< const { query, parameters, queryKey, streams, ...resolvedOptions } = options; const { - queries: [{ queryFn }], + queries: [{ queryFn: powerSyncQueryFn }], streamsHaveSynced } = usePowerSyncQueries( [ @@ -149,12 +149,15 @@ function useQueryCore< queryClient ); + const isSuspense = useQueryFn === Tanstack.useSuspenseQuery; + const resolvedQueryFn = query ? powerSyncQueryFn : resolvedOptions.queryFn; + const queryFn = streamsHaveSynced || isSuspense ? resolvedQueryFn : Tanstack.skipToken; + return useQueryFn( { ...(resolvedOptions as TQueryOptions), queryKey, - queryFn: query ? queryFn : resolvedOptions.queryFn, - enabled: streamsHaveSynced + ...((typeof queryFn === 'function' || queryFn === Tanstack.skipToken) && { queryFn }) } as TQueryOptions, queryClient ); diff --git a/packages/tanstack-react-query/tests/enabled.test.tsx b/packages/tanstack-react-query/tests/enabled.test.tsx new file mode 100644 index 000000000..644a25e91 --- /dev/null +++ b/packages/tanstack-react-query/tests/enabled.test.tsx @@ -0,0 +1,100 @@ +import { cleanup, renderHook, waitFor } from '@testing-library/react'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import React, { Suspense } from 'react'; +import { AbstractPowerSyncDatabase } from '@powersync/common'; +import { PowerSyncContext } from '@powersync/react'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { openPowerSync } from './utils'; +import { useQuery } from '../src/hooks/useQuery'; +import { useQueries } from '../src/hooks/useQueries'; +import { useSuspenseQuery } from '../src/hooks/useQuery'; + +describe('user enabled option is respected', () => { + let db: AbstractPowerSyncDatabase; + let queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false + } + } + }); + + beforeEach(() => { + db = openPowerSync(); + queryClient.clear(); + vi.clearAllMocks(); + cleanup(); + }); + + const wrapper = ({ children }: { children: React.ReactNode }) => ( + + {children} + + ); + + it('useQuery: does not run when user passes enabled: false (no streams)', async () => { + const getAllSpy = vi.spyOn(db, 'getAll'); + + const { result } = renderHook( + () => + useQuery({ + queryKey: ['lists'], + query: 'SELECT * FROM lists', + enabled: false + }), + { wrapper } + ); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(result.current.status).toBe('pending'); + expect(result.current.fetchStatus).toBe('idle'); + expect(result.current.data).toBeUndefined(); + expect(getAllSpy).not.toHaveBeenCalledWith('SELECT * FROM lists', expect.anything()); + }); + + it('useQueries: does not run an entry where user passes enabled: false (no streams)', async () => { + const getAllSpy = vi.spyOn(db, 'getAll'); + + const { result } = renderHook( + () => + useQueries({ + queries: [{ queryKey: ['lists'], query: 'SELECT * FROM lists', enabled: false }] + }), + { wrapper } + ); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(result.current[0].status).toBe('pending'); + expect(result.current[0].fetchStatus).toBe('idle'); + expect(result.current[0].data).toBeUndefined(); + expect(getAllSpy).not.toHaveBeenCalledWith('SELECT * FROM lists', expect.anything()); + }); + + it('useSuspenseQuery: with an unsynced waitForStream stream does not error with skipToken and still resolves', async () => { + const consoleErrorSpy = vi.spyOn(console, 'error'); + + const suspenseWrapper = ({ children }: { children: React.ReactNode }) => ( + + + loading}>{children} + + + ); + + const { result } = renderHook( + () => + useSuspenseQuery({ + queryKey: ['suspense-lists'], + query: 'SELECT * FROM lists', + streams: [{ name: 'a', waitForStream: true }] + }), + { wrapper: suspenseWrapper } + ); + + await waitFor(() => expect(result.current.data).toBeDefined(), { timeout: 1000, interval: 100 }); + + expect(consoleErrorSpy).not.toHaveBeenCalledWith('skipToken is not allowed for useSuspenseQuery'); + }); +}); diff --git a/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx b/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx new file mode 100644 index 000000000..1d28ac0e5 --- /dev/null +++ b/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx @@ -0,0 +1,143 @@ +import { cleanup, renderHook, waitFor } from '@testing-library/react'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import React from 'react'; +import { AbstractPowerSyncDatabase, SyncStatus } from '@powersync/common'; +import { PowerSyncContext } from '@powersync/react'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { openPowerSync } from './utils'; +import * as Tanstack from '@tanstack/react-query'; +import { usePowerSyncQueries } from '../src/hooks/usePowerSyncQueries'; + +describe('usePowerSyncQueries bug fixes', () => { + let db: AbstractPowerSyncDatabase; + let queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false + } + } + }); + + beforeEach(() => { + db = openPowerSync(); + queryClient.clear(); + vi.clearAllMocks(); + cleanup(); + }); + + const wrapper = ({ children }: { children: React.ReactNode }) => ( + + {children} + + ); + + const syncedStatus = () => + new SyncStatus({ + dataFlow: { + internalStreamSubscriptions: [ + { + name: 'a', + parameters: null, + progress: { total: 0, downloaded: 0 }, + active: true, + is_default: false, + has_explicit_subscription: true, + expires_at: null, + last_synced_at: 1234, + priority: 1 + } + ] + } + }); + + describe('Bug 1: streamsHaveSynced in final useMemo deps', () => { + it('updated returned streamsHaveSynced once a waitForStream stream syncs', async () => { + const stableQueries = [ + { + query: 'SELECT name FROM lists', + queryKey: ['bug1'], + streams: [{ name: 'a', waitForStream: true }] + } + ]; + + const { result, unmount } = renderHook(() => usePowerSyncQueries(stableQueries, queryClient), { wrapper }); + + await waitFor(() => expect(result.current.streamsHaveSynced).toBe(false), { timeout: 1000, interval: 50 }); + + db.currentStatus = syncedStatus(); + db.iterateListeners((l) => l.statusChanged?.(syncedStatus())); + + await waitFor(() => expect(result.current.streamsHaveSynced).toBe(true), { timeout: 1000, interval: 50 }); + + await waitFor(() => expect(result.current.queries[0].tables).toContain('lists'), { + timeout: 1000, + interval: 50 + }); + + unmount(); + }); + }); + + describe('Bug 2: first table-resolution race loses first-sync data', () => { + it('reflects rows written while table resolution is still on the slow path', async () => { + let releaseResolve!: () => void; + const resolveGate = new Promise((resolve) => { + releaseResolve = resolve; + }); + + const realResolveTables = db.resolveTables.bind(db); + const resolveSpy = vi.spyOn(db, 'resolveTables').mockImplementation(async (sql, params) => { + await resolveGate; + return realResolveTables(sql, params); + }); + + const realRegisterListener = db.registerListener.bind(db); + const registerListenerSpy = vi + .spyOn(db, 'registerListener') + .mockImplementation((listener: Parameters[0]) => { + if (listener && (listener as { schemaChanged?: unknown }).schemaChanged) { + const { schemaChanged: _omit, ...rest } = listener as Record; + return realRegisterListener(rest as Parameters[0]); + } + return realRegisterListener(listener); + }); + + const stableQueries = [{ query: 'SELECT name FROM lists ORDER BY name', queryKey: ['bug2'] }]; + + const { result, unmount } = renderHook( + () => { + const { queries } = usePowerSyncQueries(stableQueries, queryClient); + return Tanstack.useQuery( + { + queryKey: ['bug2'], + queryFn: queries[0].queryFn as () => Promise<{ name: string }[]>, + staleTime: Infinity, + refetchOnMount: false, + refetchOnWindowFocus: false, + refetchOnReconnect: false + }, + queryClient + ); + }, + { wrapper } + ); + + await waitFor(() => expect(result.current.data).toEqual([]), { timeout: 2000, interval: 50 }); + + await db.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?)', ['from-first-sync']); + + await new Promise((r) => setTimeout(r, 150)); + + releaseResolve(); + + await waitFor(() => expect(result.current.data).toEqual([{ name: 'from-first-sync' }]), { + timeout: 2000, + interval: 50 + }); + + resolveSpy.mockRestore(); + registerListenerSpy.mockRestore(); + unmount(); + }); + }); +}); From 833436eeb12ed4a24e8e6ab0e8b80a7b45ad1cf7 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 22 Jun 2026 14:32:09 +0200 Subject: [PATCH 2/2] Minor cleanup of implementation and tests. --- .changeset/honest-streams-enable.md | 5 ++++ .../src/hooks/usePowerSyncQueries.ts | 23 ++++--------------- .../src/hooks/useQuery.ts | 2 +- .../tests/usePowerSyncQueries.test.tsx | 6 ++--- packages/tanstack-react-query/tests/utils.ts | 3 +++ 5 files changed, 15 insertions(+), 24 deletions(-) create mode 100644 .changeset/honest-streams-enable.md diff --git a/.changeset/honest-streams-enable.md b/.changeset/honest-streams-enable.md new file mode 100644 index 000000000..b063ca579 --- /dev/null +++ b/.changeset/honest-streams-enable.md @@ -0,0 +1,5 @@ +--- +'@powersync/tanstack-react-query': patch +--- + +Respect the user-provided `enabled` option in `useQuery` and `useQueries` instead of overriding it. Queries backed by sync streams now pause via TanStack's `skipToken` until their streams have synced, leaving `enabled` fully under the caller's control (`useSuspenseQuery` always runs, since suspense rejects `skipToken`). Also fixes a stale `streamsHaveSynced` value and a race where rows written while a query's source tables were still being resolved could be missed. diff --git a/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts b/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts index c910b2358..98f136a6f 100644 --- a/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts +++ b/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts @@ -1,6 +1,6 @@ import { type CompilableQuery, parseQuery } from '@powersync/common'; import { QuerySyncStreamOptions, useAllSyncStreamsHaveSynced, usePowerSync } from '@powersync/react'; -import { useEffect, useState, useCallback, useMemo, useRef } from 'react'; +import { useEffect, useState, useCallback, useMemo } from 'react'; import * as Tanstack from '@tanstack/react-query'; export type UsePowerSyncQueriesInput = { @@ -111,17 +111,7 @@ export function usePowerSyncQueries( })) ); - // Tracks the []->[tables] transition to rescue data lost during pending resolveTables. - const tablesInitialized = useRef([]); - - if (tablesInitialized.current.length !== parsedQueries.length) { - tablesInitialized.current = parsedQueries.map(() => false); - } - useEffect(() => { - // Re-arm rescue tracking when queries change. - tablesInitialized.current = parsedQueries.map(() => false); - const listeners = parsedQueries.map((pq, idx) => { if (pq.parseError || !pq.query) { return null; @@ -158,18 +148,12 @@ export function usePowerSyncQueries( useEffect(() => { const aborts = parsedQueries.map((pq, idx) => { - if (pq.parseError || !pq.query) { + if (pq.parseError || !pq.query || !tablesArr[idx]?.length) { return null; } const abort = new AbortController(); - // Rescue data lost while resolveTables was pending (listener watched [] tables). - if (tablesArr[idx]?.length > 0 && !tablesInitialized.current[idx]) { - tablesInitialized.current[idx] = true; - queryClient.invalidateQueries({ queryKey: pq.queryKey }); - } - powerSync.onChangeWithCallback( { onChange: () => { @@ -181,7 +165,8 @@ export function usePowerSyncQueries( }, { tables: tablesArr[idx], - signal: abort.signal + signal: abort.signal, + triggerImmediate: true } ); diff --git a/packages/tanstack-react-query/src/hooks/useQuery.ts b/packages/tanstack-react-query/src/hooks/useQuery.ts index e1b9a38ec..8b886bd58 100644 --- a/packages/tanstack-react-query/src/hooks/useQuery.ts +++ b/packages/tanstack-react-query/src/hooks/useQuery.ts @@ -149,7 +149,7 @@ function useQueryCore< queryClient ); - const isSuspense = useQueryFn === Tanstack.useSuspenseQuery; + const isSuspense = (useQueryFn as unknown) === Tanstack.useSuspenseQuery; const resolvedQueryFn = query ? powerSyncQueryFn : resolvedOptions.queryFn; const queryFn = streamsHaveSynced || isSuspense ? resolvedQueryFn : Tanstack.skipToken; diff --git a/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx b/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx index 1d28ac0e5..5cf617dff 100644 --- a/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx +++ b/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx @@ -50,7 +50,7 @@ describe('usePowerSyncQueries bug fixes', () => { } }); - describe('Bug 1: streamsHaveSynced in final useMemo deps', () => { + describe('usePowerSyncQueries ', () => { it('updated returned streamsHaveSynced once a waitForStream stream syncs', async () => { const stableQueries = [ { @@ -76,10 +76,8 @@ describe('usePowerSyncQueries bug fixes', () => { unmount(); }); - }); - describe('Bug 2: first table-resolution race loses first-sync data', () => { - it('reflects rows written while table resolution is still on the slow path', async () => { + it('picks up rows written before the source tables finished resolving', async () => { let releaseResolve!: () => void; const resolveGate = new Promise((resolve) => { releaseResolve = resolve; diff --git a/packages/tanstack-react-query/tests/utils.ts b/packages/tanstack-react-query/tests/utils.ts index fb3c11e50..d0d56e794 100644 --- a/packages/tanstack-react-query/tests/utils.ts +++ b/packages/tanstack-react-query/tests/utils.ts @@ -1,5 +1,6 @@ import * as commonSdk from '@powersync/common'; +import { cleanup } from '@testing-library/react'; import { PowerSyncDatabase } from '@powersync/web'; import { onTestFinished } from 'vitest'; @@ -14,6 +15,8 @@ export const openPowerSync = () => { }); onTestFinished(async () => { + cleanup(); + await new Promise((resolve) => setTimeout(resolve, 100)); await db.disconnectAndClear(); await db.close(); });