Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -140,13 +141,16 @@ class ConnectionWorker implements AutoCloseable {
* Tracks current inflight requests in the stream.
*/
@GuardedBy("lock")
private long inflightRequests = 0;
private final AtomicLong inflightRequests = new AtomicLong(0);

/*
* Tracks current inflight bytes in the stream.
*/
@GuardedBy("lock")
private long inflightBytes = 0;
private final AtomicLong inflightBytes = new AtomicLong(0);

private final TrackRequestQueueEarliestSendTime trackRequestQueueEarliestSendTime =
new TrackRequestQueueEarliestSendTime();

/*
* Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
Expand Down Expand Up @@ -395,7 +399,7 @@ private void gatherHealthCheckMetrics(HealthCheckFields healthCheckFields) {
healthCheckFields.queuedRequestCountMax = windowedQueuedRequestsMax;
healthCheckFields.queuedRetryCountMax = windowedQueuedRetriesMax;
healthCheckFields.msecLongestResponseWaitTime = windowedMilliResponseWaitTimeMax;
healthCheckFields.inflightBytes = inflightBytes;
healthCheckFields.inflightBytes = inflightBytes.get();
healthCheckFields.requestsSentCount = windowedRequestsSent;
healthCheckFields.responseCount = windowedResponsesAcked;
if (HEALTH_CHECK_INTERVAL.toMillis() > 0) {
Expand Down Expand Up @@ -779,8 +783,8 @@ private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse requestWra
@GuardedBy("lock")
private void addMessageToWaitingQueue(
AppendRequestAndResponse requestWrapper, boolean addToFront) {
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
this.inflightRequests.incrementAndGet();
this.inflightBytes.addAndGet(requestWrapper.messageSize);
hasMessageInWaitingQueue.signal();
requestProfilerHook.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
Expand Down Expand Up @@ -896,11 +900,11 @@ private ApiFuture<AppendRowsResponse> appendInternal(
}
// Check if queue is going to be full before adding the request.
if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
if (this.inflightRequests + 1 >= this.maxInflightRequests) {
if (this.inflightRequests.get() + 1 >= this.maxInflightRequests) {
throw new Exceptions.InflightRequestsLimitExceededException(
writerId, this.maxInflightRequests);
}
if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) {
if (this.inflightBytes.get() + requestWrapper.messageSize >= this.maxInflightBytes) {
throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes);
}
}
Expand All @@ -926,8 +930,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
return requestWrapper.appendResult;
}
requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
this.inflightRequests.incrementAndGet();
this.inflightBytes.addAndGet(requestWrapper.messageSize);
requestWrapper.placedInWaitingQueueTime = Instant.now();
waitingRequestQueue.addLast(requestWrapper);
healthCheckMetrics.updateWindowedQueuedRequestsMax(
Expand All @@ -938,9 +942,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
try {
maybeWaitForInflightQuota();
} catch (StatusRuntimeException ex) {
--this.inflightRequests;
this.inflightRequests.decrementAndGet();
waitingRequestQueue.pollLast();
this.inflightBytes -= requestWrapper.messageSize;
this.inflightBytes.addAndGet(-requestWrapper.messageSize);
throw ex;
}
requestProfilerHook.endOperation(
Expand All @@ -954,8 +958,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
@GuardedBy("lock")
private void maybeWaitForInflightQuota() {
long start_time = System.currentTimeMillis();
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
while (this.inflightRequests.get() >= this.maxInflightRequests
|| this.inflightBytes.get() >= this.maxInflightBytes) {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -998,6 +1002,11 @@ void setTestOnlyRunTimeExceptionInAppendLoop(
this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop;
}

@VisibleForTesting
Instant getEarliestSendTime() {
return trackRequestQueueEarliestSendTime.getEarliestSendTime();
}

@VisibleForTesting()
HealthCheckMetrics.HealthCheckFields gatherTestOnlyHealthCheckMetrics() {
this.lock.lock();
Expand Down Expand Up @@ -1229,7 +1238,9 @@ private void appendLoop() {
firstRequestForTableOrSchemaSwitch = true;
}
while (!localQueue.isEmpty()) {
localQueue.peekFirst().setRequestSendQueueTime();
AppendRequestAndResponse head = localQueue.peekFirst();
head.setRequestSendQueueTime();
trackRequestQueueEarliestSendTime.captureEarliest(head.requestSendTimeStamp);
AppendRequestAndResponse wrapper = localQueue.pollFirst();
AppendRowsRequest originalRequest = wrapper.message;
String requestUniqueId = wrapper.requestUniqueId;
Expand Down Expand Up @@ -1642,6 +1653,9 @@ private void requestCallback(AppendRowsResponse response) {
if (response.hasError()) {
if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) {
log.info("Attempting to retry on error: " + response.getError().toString());
// Note that if we are retrying a request it is still in the system so we don't refresh the
// earliest send time. That way we can keep track of the earliest send time based on the
// first time the request was sent, which gives us a better idea of load on this worker.
return;
}
}
Expand All @@ -1653,6 +1667,10 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}
}
// Since we have processed a response and have now removed that request from the system, go
// ahead and refresh the earliest send time, based on the remaining requests that are
// outstanding.
trackRequestQueueEarliestSendTime.discardAndRefresh();

// We need a separate thread pool to unblock the next request callback.
// Otherwise user may call append inside request callback, which may be blocked on waiting
Expand Down Expand Up @@ -1788,8 +1806,8 @@ private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) {
AppendRequestAndResponse requestWrapper =
pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll();
requestWrapper.requestSendTimeStamp = null;
--this.inflightRequests;
this.inflightBytes -= requestWrapper.messageSize;
this.inflightRequests.decrementAndGet();
this.inflightBytes.addAndGet(-requestWrapper.messageSize);
this.inflightReduced.signal();
return requestWrapper;
}
Expand Down Expand Up @@ -1881,9 +1899,15 @@ void setRequestSendQueueTime() {

/** Returns the current workload of this worker. */
public Load getLoad() {
Duration timeSinceLastCallback = Duration.ZERO;
Instant earliestSendTime = trackRequestQueueEarliestSendTime.getEarliestSendTime();
if (earliestSendTime != null) {
timeSinceLastCallback = Duration.between(earliestSendTime, Instant.now());
}
return Load.create(
inflightBytes,
inflightRequests,
timeSinceLastCallback,
inflightBytes.get(),
inflightRequests.get(),
destinationSet.size(),
maxInflightBytes,
maxInflightRequests);
Expand All @@ -1896,11 +1920,15 @@ public Load getLoad() {
@AutoValue
public abstract static class Load {

// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
// Consider the load on this worker to be overwhelmed when above some inflight latency or
// percentage of in-flight bytes or in-flight requests count.
private static Duration overwhelmedTimeSinceLastCallback = Duration.ofSeconds(3);
private static double overwhelmedInflightCount = 0.2;
private static double overwhelmedInflightBytes = 0.2;

// Time we have spent waiting for a response in the worker.
abstract Duration timeSinceLastCallback();

// Number of in-flight requests bytes in the worker.
abstract long inFlightRequestsBytes();

Expand All @@ -1917,12 +1945,14 @@ public abstract static class Load {
abstract long maxInflightCount();

static Load create(
Duration timeSinceLastCallback,
long inFlightRequestsBytes,
long inFlightRequestsCount,
long destinationCount,
long maxInflightBytes,
long maxInflightCount) {
return new AutoValue_ConnectionWorker_Load(
timeSinceLastCallback,
inFlightRequestsBytes,
inFlightRequestsCount,
destinationCount,
Expand All @@ -1934,20 +1964,29 @@ boolean isOverwhelmed() {
// Consider only in flight bytes and count for now, as by experiment those two are the most
// efficient and has great simplity.
return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount()
|| inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes();
|| inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes()
|| timeSinceLastCallback().compareTo(overwhelmedTimeSinceLastCallback) > 0;
}

// Compares two different load. First compare in flight request bytes split by size 1024 bucket.
// Compares two different load. First compare the timeSinceLastCallback bucketed into 1 second
// intervals.
// Then compare in flight request bytes split by size 1024 bucket.
// Then compare the inflight requests count.
// Then compare destination count of the two connections.
public static final Comparator<Load> LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
Comparator.comparing((Load key) -> (int) key.timeSinceLastCallback().getSeconds())
.thenComparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
.thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100))
.thenComparing(Load::destinationCount);

// Compares two different load without bucket, used in smaller scale unit testing.
// First compare the timeSinceLastCallback.
// Then compare in flight request bytes.
// Then compare the inflight requests count.
// Then compare destination count of the two connections.
public static final Comparator<Load> TEST_LOAD_COMPARATOR =
Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes())
Comparator.comparing(Load::timeSinceLastCallback)
.thenComparing((Load key) -> (int) key.inFlightRequestsBytes())
.thenComparing((Load key) -> (int) key.inFlightRequestsCount())
.thenComparing(Load::destinationCount);

Expand All @@ -1960,6 +1999,11 @@ public static void setOverwhelmedBytesThreshold(double newThreshold) {
public static void setOverwhelmedCountsThreshold(double newThreshold) {
overwhelmedInflightCount = newThreshold;
}

@VisibleForTesting
public static void setOverwhelmedTimeSinceLastCallbackThreshold(Duration newThreshold) {
overwhelmedTimeSinceLastCallback = newThreshold;
}
}

@VisibleForTesting
Expand All @@ -1985,4 +2029,36 @@ static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedS
return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
}
}

class TrackRequestQueueEarliestSendTime {
private final AtomicReference<Instant> earliestSendTime = new AtomicReference<>(null);

public void captureEarliest(Instant sendTime) {
// This method records the given sendTime only if earliestSendTime is currently NULL.
if (sendTime == null) {
return;
}
earliestSendTime.compareAndSet(null, sendTime);
}

public void discardAndRefresh() {
Instant newEarliestSendTime = null;
lock.lock();
try {
if (!inflightRequestQueue.isEmpty()) {
AppendRequestAndResponse head = inflightRequestQueue.peekFirst();
if (head != null) {
newEarliestSendTime = head.requestSendTimeStamp;
}
}
} finally {
lock.unlock();
}
earliestSendTime.set(newEarliestSendTime);
}

public Instant getEarliestSendTime() {
return earliestSendTime.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ void setUp() throws Exception {
.build();
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5);
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6);
ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofSeconds(3));
}

@Test
Expand Down Expand Up @@ -555,6 +556,55 @@ private ProtoRows createProtoRows(String[] messages) {
return rowsBuilder.build();
}

@Test
void testSingleTableConnections_overwhelmed_timeSinceLastCallback() throws Exception {
// Set count/bytes thresholds to be very high so they don't trigger.
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.9);
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.9);
// Set time threshold to 100ms.
ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofMillis(100));

// We use a pool with max 8 connections.
ConnectionWorkerPool.setOptions(
Settings.builder()
.setMinConnectionsPerRegion(1) // Start with 1 connection to make scaling obvious.
.setMaxConnectionsPerRegion(8)
.build());

// We set maxRequests to a large value (100) so it's not overwhelmed by count (threshold 90).
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(
/* maxRequests= */ 100, /* maxBytes= */ 1000000, java.time.Duration.ofSeconds(5));

// Stuck requests for 500ms (larger than 100ms threshold).
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));

// Send 1 request. It will go to Connection 1.
testBigQueryWrite.addResponse(createAppendResponse(0));
StreamWriter writer = getTestStreamWriter(TEST_STREAM_1);

ApiFuture<AppendRowsResponse> future1 =
sendFooStringTestMessage(writer, connectionWorkerPool, new String[] {"0"}, 0);

// Wait 500ms. Request 1 is still in flight (needs 1000ms).
// Connection 1 timeSinceLastCallback should be ~500ms > 100ms.
// So Connection 1 is now overwhelmed.
Thread.sleep(500);

// Send Request 2. Since Connection 1 is overwhelmed, it should scale up and create Connection
// 2.
testBigQueryWrite.addResponse(createAppendResponse(1));
ApiFuture<AppendRowsResponse> future2 =
sendFooStringTestMessage(writer, connectionWorkerPool, new String[] {"1"}, 1);

// Wait for both to finish.
future1.get();
future2.get();

// Verify that we created 2 connections.
assertThat(connectionWorkerPool.getCreateConnectionCount()).isEqualTo(2);
}

ConnectionWorkerPool createConnectionWorkerPool(
long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
ConnectionWorkerPool.enableTestingLogic();
Expand Down
Loading
Loading