Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
c772c91
fix(pool): give pooled store-and-forward senders distinct slot ids
bluestreak01 Jun 17, 2026
49b598a
Harden pool teardown against Errors + exclude pool slot namespace fro…
bluestreak01 Jun 18, 2026
2311b98
Fix SF pool freeing a slot index while its flock is still held
bluestreak01 Jun 18, 2026
5422b67
Merge remote-tracking branch 'origin/main' into fix/pool-sf-slot-coll…
bluestreak01 Jun 18, 2026
90ecef2
test(pool): migrate SenderPoolSfTest to ephemeral-port TestWebSocketS…
bluestreak01 Jun 18, 2026
b1b8dfa
Make leaked SF slots observable + harden pool construction
bluestreak01 Jun 18, 2026
9f00b31
fix(pool): catch Throwable on all pool creation paths, not just Runti…
bluestreak01 Jun 18, 2026
3024b5e
fix(pool): bound orphan-drain exclusion to [0,maxSize) so shrinking m…
bluestreak01 Jun 18, 2026
7118264
fix(pool): discard pooled sender when flush() exits with an Error
bluestreak01 Jun 18, 2026
9bdda71
fix(pool): widen QuestDBImpl ctor cleanup catch to Throwable
bluestreak01 Jun 18, 2026
a2610a6
fix(pool): recover stranded in-range SF slots at pool startup
bluestreak01 Jun 18, 2026
e7d893a
test(pool): cover flush() Error -> discardBroken regression
bluestreak01 Jun 18, 2026
b504e16
test(pool): cover borrow-path SF slot-index release on creation failure
bluestreak01 Jun 18, 2026
d0c03f5
fix(pool): bound startup SF recovery to a shared acquire-timeout budget
bluestreak01 Jun 20, 2026
2709cba
refactor(sf): drop superseded prefix-exclusion OrphanScanner.scan ove…
bluestreak01 Jun 20, 2026
edc38dd
fix(pool): retire stranded SF slot on startup when recoverer keeps flock
bluestreak01 Jun 20, 2026
48c33c8
test(pool): harden teardown coverage and fix native leak in teardown …
bluestreak01 Jun 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,15 @@ final class LineSenderBuilder {
// runtime lands in a follow-up commit. For now we surface the
// count via logging so users can confirm orphans are being seen.
private boolean drainOrphans = false;
// Orphan-scan exclusion for the connection pool. The pool co-manages
// exactly <orphanDrainBase>-<i> for i in [0, orphanDrainSlotCount) and
// recovers each of those on (re)creation, so pooled senders must never
// treat one another's live slots as drainable orphans. Anything else --
// a different base, a bare un-suffixed id, OR a same-base index at or
// above the count (a slot left behind by a larger pool before maxSize
// shrank) -- is still drained, so unacked data is never stranded.
private String orphanDrainBase;
private int orphanDrainSlotCount;
private long durableAckKeepaliveIntervalMillis = DURABLE_ACK_KEEPALIVE_NOT_SET;
// Optional user-supplied async error handler. When null, the sender
// uses DefaultSenderErrorHandler.INSTANCE (loud-not-silent log).
Expand Down Expand Up @@ -1472,7 +1481,15 @@ public Sender build() {
} else {
if (!Files.exists(sfDir)) {
int rc = Files.mkdir(sfDir, Files.DIR_MODE_DEFAULT);
if (rc != 0) {
// mkdir is non-zero on failure, but "already exists"
// is one such failure. Multiple SF senders sharing one
// sf_dir can be built concurrently (the pool calls
// build() outside its lock), so two threads can both
// pass the exists() check and race into mkdir; the
// loser gets EEXIST. Treat a benign creation race --
// the dir now exists -- as success and only fail when
// the directory is genuinely absent afterwards.
if (rc != 0 && !Files.exists(sfDir)) {
throw new LineSenderException(
"could not create sf_dir: " + sfDir + " rc=" + rc);
}
Expand Down Expand Up @@ -1548,7 +1565,7 @@ public Sender build() {
if (drainOrphans && sfDir != null) {
io.questdb.client.std.ObjList<String> orphans =
io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner
.scan(sfDir, senderId);
.scan(sfDir, senderId, orphanDrainBase, orphanDrainSlotCount);
if (orphans.size() > 0) {
org.slf4j.LoggerFactory.getLogger(LineSenderBuilder.class)
.info("dispatching drainers for {} orphan slot(s) under {} "
Expand Down Expand Up @@ -2471,6 +2488,71 @@ public LineSenderBuilder senderId(String id) {
return this;
}

/**
* The slot id ({@code sender_id}) currently configured on this
* builder, either parsed from the config string or left at its
* {@code "default"} default. Introspection hook for the connection
* pool, which derives a distinct per-slot id from this base so that
* multiple pooled senders sharing one {@code sf_dir} don't collide
* on the slot {@code flock}.
*/
public String getConfiguredSenderId() {
return senderId;
}

/**
* The store-and-forward group root ({@code sf_dir}) currently
* configured on this builder, or {@code null} when SF is disabled.
* Introspection hook for the connection pool, which needs the group
* root to locate its own managed slot dirs {@code <sf_dir>/<base>-<i>}
* when recovering unacked data a previous run left behind.
*/
public String getConfiguredSfDir() {
return sfDir;
}

/**
* Excludes the connection pool's <em>live</em> slot set from
* {@link #drainOrphans(boolean)} scanning: a sibling slot under
* {@code sf_dir} named {@code <base>-<index>} with
* {@code 0 <= index < slotCount} is never treated as a drainable orphan.
* <p>
* Internal introspection hook for the connection pool. The pool gives
* each pooled SF sender a distinct slot id {@code <base>-<index>} and
* recovers each slot's unacked data itself when it (re)creates that
* slot. Without this exclusion, one pooled sender's startup drainer
* could adopt a sibling pool slot's lock and dir, reintroducing the
* very "sf slot already in use" collision the per-slot ids were added
* to prevent.
* <p>
* Unlike a blanket {@code <base>-} prefix exclusion, the bound is the
* pool's {@code maxSize}: a same-base slot whose index is at or above
* {@code slotCount} (e.g. {@code <base>-3} left behind by a larger pool
* before {@code maxSize} shrank from 4 to 2) is NOT excluded and is
* drained like any foreign leftover, so its unacked data is recovered
* instead of being silently stranded. Foreign leftovers (a different
* base, or a bare un-suffixed id) are also still drained.
* <p>
* Pass a {@code null}/empty base or {@code slotCount <= 0} to disable
* the exclusion (the default).
*/
public LineSenderBuilder orphanDrainExcludeManagedSlots(String base, int slotCount) {
this.orphanDrainBase = base;
this.orphanDrainSlotCount = slotCount;
return this;
}

/**
* True iff store-and-forward is enabled (an {@code sf_dir} was set).
* Introspection hook for the connection pool: SF senders own an
* exclusive on-disk slot, so each pooled sender needs its own slot
* id, whereas non-SF (memory-mode / HTTP / TCP) senders share no
* such resource and need no per-slot identity.
*/
public boolean isStoreAndForwardEnabled() {
return sfDir != null;
}

/**
* Per-call deadline for {@code Sender.flush()} spinning on a full
* cursor segment ring waiting for ACKs to drain space. Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ public class QwpWebSocketSender implements Sender {
private Sender.InitialConnectMode initialConnectMode = Sender.InitialConnectMode.OFF;
private boolean ownsCursorEngine;
private long pendingBytes;
// Set true by close() once the SF slot flock has been released (the normal
// teardown path). Stays false if close() bailed early with the I/O thread
// still running -- then cursorEngine.close() never ran and the flock is
// still held, so the owning pool MUST keep the slot reserved rather than
// hand the still-locked dir to the next borrow ("sf slot already in use").
private boolean slotLockReleased;
private int pendingRowCount;
private SenderProgressDispatcher progressDispatcher;
// Async-delivery sink for ack-watermark advances. Default no-op; a
Expand Down Expand Up @@ -1087,6 +1093,10 @@ public void close() {
cursorEngine = null;
ownsCursorEngine = false;
}
// Past the ioThreadStopped guard => cursorEngine.close() ran and
// released the SF flock in its finally (or this sender owned no
// engine holding one). Signal the pool it may reuse the slot.
slotLockReleased = true;

// Shutdown order: dispatcher last, after the I/O loop has stopped
// producing into it. close() drains pending entries with a short
Expand Down Expand Up @@ -1129,6 +1139,16 @@ public void close() {
}
}

/**
* True once {@link #close()} has released the store-and-forward slot
* flock. False means close() leaked the still-running I/O thread (and its
* resources), so the flock is still held; the owning pool must keep the
* slot index reserved instead of reusing the still-locked slot dir.
*/
public boolean isSlotLockReleased() {
return slotLockReleased;
}

@Override
public Sender decimalColumn(CharSequence name, Decimal64 value) {
checkNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,47 @@ private OrphanScanner() {
* "no orphans" answer in that case.
*/
public static ObjList<String> scan(String sfDir, String excludeSlotName) {
// Thin delegate to the managed-aware scan: a null managedBase disables
// managed-slot exclusion, so this is exactly the unmanaged scan. Kept
// as a convenience overload for callers (and tests) with no pool-minted
// slot namespace to skip.
return scan(sfDir, excludeSlotName, null, 0);
}

/**
* As {@link #scan(String, String)}, but excludes only the <em>exact</em>
* set of slot dirs a connection pool can re-create and self-recover:
* {@code <managedBase>-<i>} for {@code 0 <= i < managedSlotCount}.
* <p>
* The exclusion is bounded to the canonical pool-minted slots rather than
* the <em>whole</em> {@code <base>-} namespace, so unacked data is never
* stranded after a {@code maxSize} shrink across restarts: a slot like
* {@code <base>-3} left over from a larger pool is neither re-created (out
* of the new {@code [0,maxSize)} index range) nor silently excluded. By
* bounding the exclusion to {@code [0,managedSlotCount)},
* any same-base slot with an index at or above {@code managedSlotCount} is
* treated like a foreign leftover and becomes a drainable orphan, so its
* data is recovered through the normal drain path.
* <p>
* Only canonical, pool-minted names are excluded: the suffix after
* {@code <managedBase>-} must be a canonical non-negative decimal
* ({@code 0,1,2,...} with no leading zeros, sign, or non-digits). Anything
* else under the same base ({@code <base>-foo}, {@code <base>-007}) is not a
* name the pool creates and is reported as a candidate.
* <p>
* When {@code managedBase} is null/empty or {@code managedSlotCount <= 0}
* no exclusion is applied (every sibling with data is a candidate).
*/
public static ObjList<String> scan(String sfDir, String excludeSlotName, String managedBase, int managedSlotCount) {
ObjList<String> orphans = new ObjList<>();
if (sfDir == null || !Files.exists(sfDir)) {
return orphans;
}
boolean hasManaged = managedBase != null && !managedBase.isEmpty() && managedSlotCount > 0;
String managedPrefix = hasManaged ? managedBase + "-" : null;
long find = Files.findFirst(sfDir);
if (find < 0) {
LOG.warn("orphan scan could not enumerate {} treating as no orphans, "
LOG.warn("orphan scan could not enumerate {} \u2014 treating as no orphans, "
+ "but this may indicate a permission or transient error", sfDir);
return orphans;
}
Expand All @@ -99,6 +133,9 @@ public static ObjList<String> scan(String sfDir, String excludeSlotName) {
if (excludeSlotName != null && excludeSlotName.equals(name)) {
continue;
}
if (hasManaged && isManagedSlot(name, managedPrefix, managedSlotCount)) {
continue;
}
String slotPath = sfDir + "/" + name;
if (!isCandidateOrphan(slotPath)) {
continue;
Expand All @@ -111,6 +148,50 @@ public static ObjList<String> scan(String sfDir, String excludeSlotName) {
return orphans;
}

/**
* True iff {@code name} is a slot the pool actively co-manages, i.e.
* {@code <managedPrefix><i>} where {@code i} is a canonical non-negative
* decimal in {@code [0, managedSlotCount)}. Visible for testing.
*/
public static boolean isManagedSlot(String name, String managedPrefix, int managedSlotCount) {
if (name == null || managedPrefix == null || !name.startsWith(managedPrefix)) {
return false;
}
int idx = parseCanonicalIndex(name, managedPrefix.length());
return idx >= 0 && idx < managedSlotCount;
}

/**
* Parses the canonical non-negative decimal that makes up the rest of
* {@code name} from {@code from}. Returns {@code -1} for an empty suffix,
* a non-digit, a leading zero (e.g. {@code "007"}), or anything that would
* overflow {@code int}. Only the exact form the pool emits
* ({@code Integer.toString(index)}) is accepted, so foreign or malformed
* same-base names never get mistaken for a managed slot.
*/
private static int parseCanonicalIndex(String name, int from) {
int len = name.length();
if (from >= len) {
return -1;
}
// Reject leading zeros unless the whole suffix is exactly "0".
if (name.charAt(from) == '0' && len - from > 1) {
return -1;
}
long acc = 0;
for (int i = from; i < len; i++) {
char c = name.charAt(i);
if (c < '0' || c > '9') {
return -1;
}
acc = acc * 10 + (c - '0');
if (acc > Integer.MAX_VALUE) {
return -1;
}
}
return (int) acc;
}

/**
* True iff {@code slotPath} looks like a slot dir with unacked data
* and no failure sentinel. Visible for testing.
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/java/io/questdb/client/impl/PoolHousekeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,21 @@ private void runLoop() {
}
try {
senderPool.reapIdle();
} catch (RuntimeException ignored) {
// Reaping must not propagate -- it's best-effort housekeeping.
} catch (Throwable ignored) {
// Defensive, intentionally unreachable in normal operation:
// SenderPool.reapIdle() already swallows per-delegate close()
// failures internally. The outer catch is a belt-and-braces
// guard. Reaping must not propagate -- it's best-effort
// housekeeping. Catch Throwable (not just RuntimeException) so
// an Error from a delegate teardown can never kill this daemon
// thread and stop all future reaping for the life of the handle.
}
try {
queryPool.reapIdle();
} catch (RuntimeException ignored) {
} catch (Throwable ignored) {
// Same rationale as the senderPool guard above: best-effort,
// must never propagate, and Throwable (not RuntimeException) so
// an Error from query-client teardown cannot kill the daemon.
}
}
}
Expand Down
43 changes: 30 additions & 13 deletions core/src/main/java/io/questdb/client/impl/PooledSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@ public final class PooledSender implements Sender {
private final long createdAtMillis;
private final Sender delegate;
private final SenderPool pool;
// Index of the store-and-forward slot this wrapper owns within the pool,
// or -1 when SF is disabled. Stable for the wrapper's whole life; the
// pool returns it to the free set only when the wrapper is evicted from
// {@code all} (discardBroken / reapIdle). Used to derive a distinct
// {@code sender_id} per pooled sender so concurrent SF senders sharing
// one {@code sf_dir} never collide on the slot {@code flock}.
private final int slotIndex;
private volatile long idleSinceMillis;
private volatile boolean inUse;
private volatile boolean invalidated;

PooledSender(Sender delegate, SenderPool pool) {
PooledSender(Sender delegate, SenderPool pool, int slotIndex) {
this.delegate = delegate;
this.pool = pool;
this.slotIndex = slotIndex;
this.createdAtMillis = System.currentTimeMillis();
this.idleSinceMillis = this.createdAtMillis;
}
Expand Down Expand Up @@ -148,17 +156,15 @@ public void close() {
if (!inUse) {
return;
}
boolean broken = false;
// Track normal completion rather than catching a specific throwable
// type. flush() can exit abnormally with an Error (AssertionError
// under -ea, OutOfMemoryError, ...) as well as a RuntimeException;
// keying the recycle decision off normal completion treats every
// abnormal exit as unrecyclable, which is the fail-safe default.
boolean flushed = false;
try {
delegate.flush();
} catch (RuntimeException e) {
// Sender does not clear its buffer on flush failure (see
// Sender Javadoc), and WebSocket transport latches the failure
// for good. Either way, the wrapper is unsafe to recycle: the
// next borrower would inherit the failed rows or a dead
// connection.
broken = true;
throw e;
flushed = true;
} finally {
inUse = false;
// Clear the pin BEFORE returning the slot. If we cleared
Expand All @@ -167,10 +173,17 @@ public void close() {
// re-pin on this thread would return the (now in-use)
// wrapper -- the same race this clear is meant to close.
pool.clearPinIfCurrent(this);
if (broken) {
pool.discardBroken(this);
} else {
if (flushed) {
pool.giveBack(this);
} else {
// flush() did not complete normally. Sender does not clear
// its buffer on flush failure (see Sender Javadoc), and
// WebSocket transport latches the failure for good. Either
// way the wrapper is unsafe to recycle: the next borrower
// would inherit the failed rows or a dead connection. The
// original throwable propagates naturally once this finally
// returns -- no explicit rethrow needed.
pool.discardBroken(this);
}
}
}
Expand Down Expand Up @@ -372,6 +385,10 @@ long createdAtMillis() {
return createdAtMillis;
}

int slotIndex() {
return slotIndex;
}

Sender delegate() {
return delegate;
}
Expand Down
Loading
Loading