From de2d8aa444d70ce047b0231ab70e176cd28ac3b8 Mon Sep 17 00:00:00 2001 From: agrawal-siddharth Date: Sun, 7 Jun 2026 00:09:25 +0000 Subject: [PATCH] feat: scale up connection worker pool based on latency --- .../bigquery/storage/v1/ConnectionWorker.java | 124 +++++++-- .../storage/v1/ConnectionWorkerPoolTest.java | 50 ++++ .../storage/v1/ConnectionWorkerTest.java | 238 +++++++++++++++++- 3 files changed, 379 insertions(+), 33 deletions(-) diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 215176e7b46f..75aba8b3644b 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -52,6 +52,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -140,13 +141,16 @@ class ConnectionWorker implements AutoCloseable { * Tracks current inflight requests in the stream. */ @GuardedBy("lock") - private long inflightRequests = 0; + private final AtomicLong inflightRequests = new AtomicLong(0); /* * Tracks current inflight bytes in the stream. */ @GuardedBy("lock") - private long inflightBytes = 0; + private final AtomicLong inflightBytes = new AtomicLong(0); + + private final TrackRequestQueueEarliestSendTime trackRequestQueueEarliestSendTime = + new TrackRequestQueueEarliestSendTime(); /* * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the @@ -395,7 +399,7 @@ private void gatherHealthCheckMetrics(HealthCheckFields healthCheckFields) { healthCheckFields.queuedRequestCountMax = windowedQueuedRequestsMax; healthCheckFields.queuedRetryCountMax = windowedQueuedRetriesMax; healthCheckFields.msecLongestResponseWaitTime = windowedMilliResponseWaitTimeMax; - healthCheckFields.inflightBytes = inflightBytes; + healthCheckFields.inflightBytes = inflightBytes.get(); healthCheckFields.requestsSentCount = windowedRequestsSent; healthCheckFields.responseCount = windowedResponsesAcked; if (HEALTH_CHECK_INTERVAL.toMillis() > 0) { @@ -779,8 +783,8 @@ private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse requestWra @GuardedBy("lock") private void addMessageToWaitingQueue( AppendRequestAndResponse requestWrapper, boolean addToFront) { - ++this.inflightRequests; - this.inflightBytes += requestWrapper.messageSize; + this.inflightRequests.incrementAndGet(); + this.inflightBytes.addAndGet(requestWrapper.messageSize); hasMessageInWaitingQueue.signal(); requestProfilerHook.startOperation( RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId); @@ -896,11 +900,11 @@ private ApiFuture appendInternal( } // Check if queue is going to be full before adding the request. if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { - if (this.inflightRequests + 1 >= this.maxInflightRequests) { + if (this.inflightRequests.get() + 1 >= this.maxInflightRequests) { throw new Exceptions.InflightRequestsLimitExceededException( writerId, this.maxInflightRequests); } - if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) { + if (this.inflightBytes.get() + requestWrapper.messageSize >= this.maxInflightBytes) { throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes); } } @@ -926,8 +930,8 @@ private ApiFuture appendInternal( return requestWrapper.appendResult; } requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId); - ++this.inflightRequests; - this.inflightBytes += requestWrapper.messageSize; + this.inflightRequests.incrementAndGet(); + this.inflightBytes.addAndGet(requestWrapper.messageSize); requestWrapper.placedInWaitingQueueTime = Instant.now(); waitingRequestQueue.addLast(requestWrapper); healthCheckMetrics.updateWindowedQueuedRequestsMax( @@ -938,9 +942,9 @@ private ApiFuture appendInternal( try { maybeWaitForInflightQuota(); } catch (StatusRuntimeException ex) { - --this.inflightRequests; + this.inflightRequests.decrementAndGet(); waitingRequestQueue.pollLast(); - this.inflightBytes -= requestWrapper.messageSize; + this.inflightBytes.addAndGet(-requestWrapper.messageSize); throw ex; } requestProfilerHook.endOperation( @@ -954,8 +958,8 @@ private ApiFuture appendInternal( @GuardedBy("lock") private void maybeWaitForInflightQuota() { long start_time = System.currentTimeMillis(); - while (this.inflightRequests >= this.maxInflightRequests - || this.inflightBytes >= this.maxInflightBytes) { + while (this.inflightRequests.get() >= this.maxInflightRequests + || this.inflightBytes.get() >= this.maxInflightBytes) { try { inflightReduced.await(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -998,6 +1002,11 @@ void setTestOnlyRunTimeExceptionInAppendLoop( this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop; } + @VisibleForTesting + Instant getEarliestSendTime() { + return trackRequestQueueEarliestSendTime.getEarliestSendTime(); + } + @VisibleForTesting() HealthCheckMetrics.HealthCheckFields gatherTestOnlyHealthCheckMetrics() { this.lock.lock(); @@ -1229,7 +1238,9 @@ private void appendLoop() { firstRequestForTableOrSchemaSwitch = true; } while (!localQueue.isEmpty()) { - localQueue.peekFirst().setRequestSendQueueTime(); + AppendRequestAndResponse head = localQueue.peekFirst(); + head.setRequestSendQueueTime(); + trackRequestQueueEarliestSendTime.captureEarliest(head.requestSendTimeStamp); AppendRequestAndResponse wrapper = localQueue.pollFirst(); AppendRowsRequest originalRequest = wrapper.message; String requestUniqueId = wrapper.requestUniqueId; @@ -1642,6 +1653,9 @@ private void requestCallback(AppendRowsResponse response) { if (response.hasError()) { if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { log.info("Attempting to retry on error: " + response.getError().toString()); + // Note that if we are retrying a request it is still in the system so we don't refresh the + // earliest send time. That way we can keep track of the earliest send time based on the + // first time the request was sent, which gives us a better idea of load on this worker. return; } } @@ -1653,6 +1667,10 @@ private void requestCallback(AppendRowsResponse response) { this.lock.unlock(); } } + // Since we have processed a response and have now removed that request from the system, go + // ahead and refresh the earliest send time, based on the remaining requests that are + // outstanding. + trackRequestQueueEarliestSendTime.discardAndRefresh(); // We need a separate thread pool to unblock the next request callback. // Otherwise user may call append inside request callback, which may be blocked on waiting @@ -1788,8 +1806,8 @@ private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) { AppendRequestAndResponse requestWrapper = pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll(); requestWrapper.requestSendTimeStamp = null; - --this.inflightRequests; - this.inflightBytes -= requestWrapper.messageSize; + this.inflightRequests.decrementAndGet(); + this.inflightBytes.addAndGet(-requestWrapper.messageSize); this.inflightReduced.signal(); return requestWrapper; } @@ -1881,9 +1899,15 @@ void setRequestSendQueueTime() { /** Returns the current workload of this worker. */ public Load getLoad() { + Duration timeSinceLastCallback = Duration.ZERO; + Instant earliestSendTime = trackRequestQueueEarliestSendTime.getEarliestSendTime(); + if (earliestSendTime != null) { + timeSinceLastCallback = Duration.between(earliestSendTime, Instant.now()); + } return Load.create( - inflightBytes, - inflightRequests, + timeSinceLastCallback, + inflightBytes.get(), + inflightRequests.get(), destinationSet.size(), maxInflightBytes, maxInflightRequests); @@ -1896,11 +1920,15 @@ public Load getLoad() { @AutoValue public abstract static class Load { - // Consider the load on this worker to be overwhelmed when above some percentage of - // in-flight bytes or in-flight requests count. + // Consider the load on this worker to be overwhelmed when above some inflight latency or + // percentage of in-flight bytes or in-flight requests count. + private static Duration overwhelmedTimeSinceLastCallback = Duration.ofSeconds(3); private static double overwhelmedInflightCount = 0.2; private static double overwhelmedInflightBytes = 0.2; + // Time we have spent waiting for a response in the worker. + abstract Duration timeSinceLastCallback(); + // Number of in-flight requests bytes in the worker. abstract long inFlightRequestsBytes(); @@ -1917,12 +1945,14 @@ public abstract static class Load { abstract long maxInflightCount(); static Load create( + Duration timeSinceLastCallback, long inFlightRequestsBytes, long inFlightRequestsCount, long destinationCount, long maxInflightBytes, long maxInflightCount) { return new AutoValue_ConnectionWorker_Load( + timeSinceLastCallback, inFlightRequestsBytes, inFlightRequestsCount, destinationCount, @@ -1934,20 +1964,29 @@ boolean isOverwhelmed() { // Consider only in flight bytes and count for now, as by experiment those two are the most // efficient and has great simplity. return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount() - || inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes(); + || inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes() + || timeSinceLastCallback().compareTo(overwhelmedTimeSinceLastCallback) > 0; } - // Compares two different load. First compare in flight request bytes split by size 1024 bucket. + // Compares two different load. First compare the timeSinceLastCallback bucketed into 1 second + // intervals. + // Then compare in flight request bytes split by size 1024 bucket. // Then compare the inflight requests count. // Then compare destination count of the two connections. public static final Comparator LOAD_COMPARATOR = - Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024)) + Comparator.comparing((Load key) -> (int) key.timeSinceLastCallback().getSeconds()) + .thenComparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024)) .thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100)) .thenComparing(Load::destinationCount); // Compares two different load without bucket, used in smaller scale unit testing. + // First compare the timeSinceLastCallback. + // Then compare in flight request bytes. + // Then compare the inflight requests count. + // Then compare destination count of the two connections. public static final Comparator TEST_LOAD_COMPARATOR = - Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes()) + Comparator.comparing(Load::timeSinceLastCallback) + .thenComparing((Load key) -> (int) key.inFlightRequestsBytes()) .thenComparing((Load key) -> (int) key.inFlightRequestsCount()) .thenComparing(Load::destinationCount); @@ -1960,6 +1999,11 @@ public static void setOverwhelmedBytesThreshold(double newThreshold) { public static void setOverwhelmedCountsThreshold(double newThreshold) { overwhelmedInflightCount = newThreshold; } + + @VisibleForTesting + public static void setOverwhelmedTimeSinceLastCallbackThreshold(Duration newThreshold) { + overwhelmedTimeSinceLastCallback = newThreshold; + } } @VisibleForTesting @@ -1985,4 +2029,36 @@ static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedS return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema); } } + + class TrackRequestQueueEarliestSendTime { + private final AtomicReference earliestSendTime = new AtomicReference<>(null); + + public void captureEarliest(Instant sendTime) { + // This method records the given sendTime only if earliestSendTime is currently NULL. + if (sendTime == null) { + return; + } + earliestSendTime.compareAndSet(null, sendTime); + } + + public void discardAndRefresh() { + Instant newEarliestSendTime = null; + lock.lock(); + try { + if (!inflightRequestQueue.isEmpty()) { + AppendRequestAndResponse head = inflightRequestQueue.peekFirst(); + if (head != null) { + newEarliestSendTime = head.requestSendTimeStamp; + } + } + } finally { + lock.unlock(); + } + earliestSendTime.set(newEarliestSendTime); + } + + public Instant getEarliestSendTime() { + return earliestSendTime.get(); + } + } } diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 51fea1232b11..e0a376adf03e 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -89,6 +89,7 @@ void setUp() throws Exception { .build(); ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5); ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6); + ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofSeconds(3)); } @Test @@ -555,6 +556,55 @@ private ProtoRows createProtoRows(String[] messages) { return rowsBuilder.build(); } + @Test + void testSingleTableConnections_overwhelmed_timeSinceLastCallback() throws Exception { + // Set count/bytes thresholds to be very high so they don't trigger. + ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.9); + ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.9); + // Set time threshold to 100ms. + ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofMillis(100)); + + // We use a pool with max 8 connections. + ConnectionWorkerPool.setOptions( + Settings.builder() + .setMinConnectionsPerRegion(1) // Start with 1 connection to make scaling obvious. + .setMaxConnectionsPerRegion(8) + .build()); + + // We set maxRequests to a large value (100) so it's not overwhelmed by count (threshold 90). + ConnectionWorkerPool connectionWorkerPool = + createConnectionWorkerPool( + /* maxRequests= */ 100, /* maxBytes= */ 1000000, java.time.Duration.ofSeconds(5)); + + // Stuck requests for 500ms (larger than 100ms threshold). + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); + + // Send 1 request. It will go to Connection 1. + testBigQueryWrite.addResponse(createAppendResponse(0)); + StreamWriter writer = getTestStreamWriter(TEST_STREAM_1); + + ApiFuture future1 = + sendFooStringTestMessage(writer, connectionWorkerPool, new String[] {"0"}, 0); + + // Wait 500ms. Request 1 is still in flight (needs 1000ms). + // Connection 1 timeSinceLastCallback should be ~500ms > 100ms. + // So Connection 1 is now overwhelmed. + Thread.sleep(500); + + // Send Request 2. Since Connection 1 is overwhelmed, it should scale up and create Connection + // 2. + testBigQueryWrite.addResponse(createAppendResponse(1)); + ApiFuture future2 = + sendFooStringTestMessage(writer, connectionWorkerPool, new String[] {"1"}, 1); + + // Wait for both to finish. + future1.get(); + future2.get(); + + // Verify that we created 2 connections. + assertThat(connectionWorkerPool.getCreateConnectionCount()).isEqualTo(2); + } + ConnectionWorkerPool createConnectionWorkerPool( long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) { ConnectionWorkerPool.enableTestingLogic(); diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 44bb25105d12..6e4ee2642a6a 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.nio.channels.Channels; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -94,6 +95,9 @@ void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); ConnectionWorker.setMaxInflightQueueWaitTime(300000); ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10)); + ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.2); + ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.2); + ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofSeconds(3)); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); @@ -865,29 +869,116 @@ void testLoadCompare_compareLoad() { // In flight bytes bucket is split as per 1024 requests per bucket. // When in flight bytes is in lower bucket, even destination count is higher and request count // is higher, the load is still smaller. - Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10); - Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10); + Load load1 = ConnectionWorker.Load.create(Duration.ZERO, 1000, 2000, 100, 1000, 10); + Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 2000, 1000, 10, 1000, 10); assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0); // In flight bytes in the same bucke of request bytes will compare request count. - Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10); - Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10); + Load load3 = ConnectionWorker.Load.create(Duration.ZERO, 1, 300, 10, 0, 10); + Load load4 = ConnectionWorker.Load.create(Duration.ZERO, 10, 1, 10, 0, 10); assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0); // In flight request and bytes in the same bucket will compare the destination count. - Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10); - Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10); + Load load5 = ConnectionWorker.Load.create(Duration.ZERO, 200, 1, 10, 1000, 10); + Load load6 = ConnectionWorker.Load.create(Duration.ZERO, 100, 10, 10, 1000, 10); assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue(); + + // timeSinceLastCallback has the highest priority. + // load7 has higher timeSinceLastCallback (2s -> bucket 2) but lower other parameters. + // load8 has lower timeSinceLastCallback (0s -> bucket 0) but higher other parameters. + Load load7 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 10, 10); + Load load8 = ConnectionWorker.Load.create(Duration.ZERO, 10000, 10000, 100, 10, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load7, load8)).isGreaterThan(0); } @Test void testLoadIsOverWhelmed() { - // Only in flight request is considered in current overwhelmed calculation. - Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100); + // In-flight requests, bytes, and timeSinceLastCallback are considered in overwhelmed + // calculation. + + // Overwhelmed by request count + Load load1 = ConnectionWorker.Load.create(Duration.ZERO, 60, 10, 100, 90, 100); assertThat(load1.isOverwhelmed()).isTrue(); - Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100); + // Not overwhelmed + Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 1, 1, 100, 100, 100); assertThat(load2.isOverwhelmed()).isFalse(); + + // Under threshold (3s) for timeSinceLastCallback + Load load3 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 100, 100); + assertThat(load3.isOverwhelmed()).isFalse(); + + // Over threshold (3s) for timeSinceLastCallback + Load load4 = ConnectionWorker.Load.create(Duration.ofSeconds(4), 0, 0, 0, 100, 100); + assertThat(load4.isOverwhelmed()).isTrue(); + } + + @Test + void testGetLoad_timeSinceLastCallback() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + try (ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 10, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /*isMultiplexing*/ false)) { + + // Initially empty, should be zero. + assertThat(connectionWorker.getLoad().timeSinceLastCallback()).isEqualTo(Duration.ZERO); + + // Keep response in flight + testBigQueryWrite.setResponseSleep(java.time.Duration.ofSeconds(5)); + + // Send a message + ApiFuture future = + sendTestMessage(connectionWorker, sw1, createFooProtoRows(new String[] {"hello"}), 0); + + // Wait a bit to ensure it is sent and in flight queue + Thread.sleep(500); + + Load load = connectionWorker.getLoad(); + assertThat(load.timeSinceLastCallback()).isGreaterThan(Duration.ZERO); + assertThat(load.timeSinceLastCallback()) + .isLessThan(Duration.ofSeconds(2)); // Should be around 500ms + } + } + + @Test + void testLoadCompare_timeSinceLastCallback() { + // Same bytes, same count, same destination, different timeSinceLastCallback + // Bucketed by 1 second (1000ms). + + // 100ms and 200ms are in the same bucket (0). + Load load1 = ConnectionWorker.Load.create(Duration.ofMillis(100), 0, 0, 0, 0, 0); + Load load2 = ConnectionWorker.Load.create(Duration.ofMillis(200), 0, 0, 0, 0, 0); + assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isEqualTo(0); + + // 100ms and 1200ms are in different buckets (0 vs 1). + Load load3 = ConnectionWorker.Load.create(Duration.ofMillis(1200), 0, 0, 0, 0, 0); + assertThat(Load.LOAD_COMPARATOR.compare(load1, load3)).isLessThan(0); + assertThat(Load.LOAD_COMPARATOR.compare(load3, load1)).isGreaterThan(0); + } + + @Test + void testTestLoadCompare_timeSinceLastCallback() { + // TEST_LOAD_COMPARATOR compares timeSinceLastCallback unbucketed. + // 1s and 2s should be different. + Load load1 = ConnectionWorker.Load.create(Duration.ofSeconds(1), 0, 0, 0, 0, 0); + Load load2 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 0, 0); + assertThat(Load.TEST_LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0); + assertThat(Load.TEST_LOAD_COMPARATOR.compare(load2, load1)).isGreaterThan(0); } @Test @@ -1433,6 +1524,24 @@ public FakeBigQueryWriteImpl.Response get() { } } + private ConnectionWorker createConnectionWorker() throws IOException { + return new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /* isMultiplexing= */ false); + } + @Test void testInflightRetryCountHealthMetricExactlyOnce() throws Exception { ProtoSchema schema1 = createProtoSchema("foo"); @@ -1504,4 +1613,115 @@ void testInflightRetryCountHealthMetricExactlyOnce() throws Exception { assertEquals(3, healthCheckFields.responseCodes.get(Status.Code.OK.value())); assertEquals("projects/p1/datasets/d1/tables/t1/streams/s1", healthCheckFields.streamName); } + + @Test + void testEarliestSendTime_outstandingRequest() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + try (ConnectionWorker connectionWorker = createConnectionWorker()) { + // Stuck response to keep request in flight + testBigQueryWrite.setResponseSleep(java.time.Duration.ofSeconds(10)); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + Instant beforeSend = Instant.now(); + ApiFuture future = + sendTestMessage(connectionWorker, sw1, createFooProtoRows(new String[] {"0"}), 0); + + // Wait a bit to ensure it is sent and send time is captured + Thread.sleep(500); + Instant afterSend = Instant.now(); + + Instant earliestSendTime = connectionWorker.getEarliestSendTime(); + assertThat(earliestSendTime).isNotNull(); + assertThat(earliestSendTime).isAtLeast(beforeSend); + assertThat(earliestSendTime).isAtMost(afterSend); + + // Clean up + testBigQueryWrite.setResponseSleep(java.time.Duration.ZERO); + future.get(); + } + } + + @Test + void testEarliestSendTime_requestSuccess() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + try (ConnectionWorker connectionWorker = createConnectionWorker()) { + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ApiFuture future = + sendTestMessage(connectionWorker, sw1, createFooProtoRows(new String[] {"0"}), 0); + + // Wait for success response + future.get(); + + // Verify state is NULL + assertThat(connectionWorker.getEarliestSendTime()).isNull(); + } + } + + @Test + void testEarliestSendTime_retryScenario() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + try (ConnectionWorker connectionWorker = createConnectionWorker()) { + // Stuck first response to ensure we can capture send time + testBigQueryWrite.setResponseSleep(java.time.Duration.ofMillis(500)); + + // Fail enough times to exhaust retries (maxAttempts + 1) + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(0)), + /* totalFailCount= */ retrySettings.getMaxAttempts() + 1, + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build())); + + Instant beforeSend = Instant.now(); + ApiFuture future = + sendTestMessage(connectionWorker, sw1, createFooProtoRows(new String[] {"0"}), 0); + + // Wait for first attempt to be sent (response sleep is 500ms) + Thread.sleep(200); + Instant afterSend = Instant.now(); + + Instant earliestSendTime = connectionWorker.getEarliestSendTime(); + assertThat(earliestSendTime).isNotNull(); + assertThat(earliestSendTime).isAtLeast(beforeSend); + assertThat(earliestSendTime).isAtMost(afterSend); + + // Wait for first response to arrive (500ms) and retry to be scheduled. + // Retry will be scheduled with 500ms delay (initialRetryDelay). + // So it will be sent at ~1000ms. + // We check state at 800ms (after first failure but before retry is sent). + Thread.sleep(600); // 200ms + 600ms = 800ms from start. + + Instant earliestSendTimeAfterFirstFailure = connectionWorker.getEarliestSendTime(); + assertThat(earliestSendTimeAfterFirstFailure).isEqualTo(earliestSendTime); + + // Wait for all retries to exhaust and fail. + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + future.get(5, TimeUnit.SECONDS); + }); + assertThat(ex.getCause()).isInstanceOf(StatusRuntimeException.class); + assertThat(((StatusRuntimeException) ex.getCause()).getStatus().getCode()) + .isEqualTo(Code.INTERNAL); + + // Once exhausted, state should be null + assertThat(connectionWorker.getEarliestSendTime()).isNull(); + } + } }