Skip to content
1 change: 1 addition & 0 deletions java-bigquery-jdbc/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target-it/**
**/*logs*/**
**/ITBigQueryJDBCLocalTest.java
**/BigQueryStatementE2EBenchmark.java
.agents/

tools/**/*.class
tools/**/drivers/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -209,6 +210,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
Boolean reqGoogleDriveScope;
private final Properties clientInfo = new Properties();
private boolean isReadOnlyTokenUsed = false;
private final ExecutorService metadataExecutor;
private final ExecutorService queryExecutor;

BigQueryConnection(String url) throws IOException {
this(url, DataSource.fromUrl(url));
Expand Down Expand Up @@ -344,6 +347,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {

this.headerProvider = createHeaderProvider();
this.bigQuery = getBigQueryConnection();
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
}
}

Expand Down Expand Up @@ -937,23 +942,91 @@ public void close() throws SQLException {
}

private void closeImpl() throws SQLException {
SQLException exceptionToThrow = null;
try {
if (this.bigQueryReadClient != null) {
this.bigQueryReadClient.shutdown();
this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES);
this.bigQueryReadClient.close();
}

if (this.bigQueryWriteClient != null) {
this.bigQueryWriteClient.shutdown();
this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES);
this.bigQueryWriteClient.close();
}
if (this.metadataExecutor != null) {
Comment thread
Neenu1995 marked this conversation as resolved.
this.metadataExecutor.shutdown();
}
if (this.queryExecutor != null) {
this.queryExecutor.shutdown();
}

for (Statement statement : this.openStatements) {
statement.close();
try {
statement.close();
} catch (SQLException e) {
if (exceptionToThrow == null) {
exceptionToThrow = e;
} else {
exceptionToThrow.addSuppressed(e);
}
}
}
this.openStatements.clear();

boolean interrupted = Thread.currentThread().isInterrupted();

try {
if (this.bigQueryReadClient != null) {
if (interrupted) {
this.bigQueryReadClient.shutdownNow();
} else {
this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES);
}
}
if (this.bigQueryWriteClient != null) {
if (interrupted) {
this.bigQueryWriteClient.shutdownNow();
} else {
this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES);
}
}
if (this.metadataExecutor != null) {
if (interrupted || !this.metadataExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
this.metadataExecutor.shutdownNow();
}
}
if (this.queryExecutor != null) {
if (interrupted || !this.queryExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
this.queryExecutor.shutdownNow();
}
}
} catch (InterruptedException e) {
interrupted = true;
if (this.bigQueryReadClient != null) {
this.bigQueryReadClient.shutdownNow();
}
if (this.bigQueryWriteClient != null) {
this.bigQueryWriteClient.shutdownNow();
}
if (this.metadataExecutor != null) {
this.metadataExecutor.shutdownNow();
}
if (this.queryExecutor != null) {
this.queryExecutor.shutdownNow();
}
} finally {
try {
if (this.bigQueryReadClient != null) {
this.bigQueryReadClient.close();
}
} finally {
if (this.bigQueryWriteClient != null) {
this.bigQueryWriteClient.close();
}
}
}

if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException("Interrupted awaiting executor termination");
}
Comment thread
Neenu1995 marked this conversation as resolved.
} catch (ConcurrentModificationException ex) {
throw new BigQueryJdbcException("Concurrent modification during close", ex);
} catch (InterruptedException e) {
Expand All @@ -962,9 +1035,20 @@ private void closeImpl() throws SQLException {
BigQueryJdbcMdc.clear();
BigQueryJdbcRootLogger.closeConnectionHandler(this.connectionId);
}
if (exceptionToThrow != null) {
throw exceptionToThrow;
}
this.isClosed = true;
}

ExecutorService getExecutorService() {
return this.queryExecutor;
}

ExecutorService getMetadataExecutor() {
return this.metadataExecutor;
}

@Override
public boolean isClosed() {
return this.isClosed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */
class BigQueryJdbcMdc {
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger(BigQueryJdbcMdc.class.getName());

private static final InheritableThreadLocal<String> currentConnectionId =
new InheritableThreadLocal<>();

Expand Down Expand Up @@ -56,13 +60,16 @@ static void clear() {
* context from the submitting thread to the executing thread.
*/
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new MdcThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new MdcThreadFactory(threadFactory));
MdcThreadPoolExecutor executor =
new MdcThreadPoolExecutor(
nThreads,
nThreads,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new MdcThreadFactory(threadFactory));
executor.allowCoreThreadTimeOut(true);
return executor;
}

/**
Expand All @@ -73,6 +80,28 @@ static ExecutorService newFixedThreadPool(int nThreads) {
return newFixedThreadPool(nThreads, Executors.defaultThreadFactory());
}

/**
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
* context from the submitting thread to the executing thread.
*/
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new MdcThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new java.util.concurrent.SynchronousQueue<>(),
new MdcThreadFactory(threadFactory));
}

/**
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
* context from the submitting thread to the executing thread.
*/
static ExecutorService newCachedThreadPool() {
return newCachedThreadPool(Executors.defaultThreadFactory());
}

private static class MdcThreadFactory implements ThreadFactory {
private final ThreadFactory delegate;

Expand All @@ -82,11 +111,16 @@ public MdcThreadFactory(ThreadFactory delegate) {

@Override
public Thread newThread(Runnable r) {
return delegate.newThread(
() -> {
clear();
r.run();
});
Thread t =
delegate.newThread(
() -> {
clear();
r.run();
});
if (t != null) {
t.setDaemon(true);
}
return t;
}
}

Expand All @@ -102,11 +136,37 @@ public MdcThreadPoolExecutor(
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

private final AtomicBoolean warningLogged = new AtomicBoolean(false);

private void monitorQueueSaturation(int queueSize) {
int maxPoolSize = getMaximumPoolSize();
// Warn when queue size is >= maxPoolSize * 5, with a minimum of 10 tasks to avoid false
// alerts for tiny pools
int warnThreshold = Math.max(10, maxPoolSize * 5);
// Recovery reset threshold is maxPoolSize * 2, with a minimum of 4 tasks
int recoveryThreshold = Math.max(4, maxPoolSize * 2);

if (queueSize >= warnThreshold) {
if (warningLogged.compareAndSet(false, true)) {
LOG.warning(
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
maxPoolSize, getActiveCount(), queueSize);
}
} else if (queueSize <= recoveryThreshold) {
if (warningLogged.get()) {
warningLogged.set(false);
}
}
Comment thread
Neenu1995 marked this conversation as resolved.
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}

monitorQueueSaturation(getQueue().size());

if (command instanceof MdcFutureTask) {
super.execute(command);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,8 +1077,9 @@ public Integer getMetadataFetchThreadCount() {

public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) {
if (metadataFetchThreadCount != null) {
validateNonNegative(
validateMin(
metadataFetchThreadCount,
1,
BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME);
}
this.metadataFetchThreadCount = metadataFetchThreadCount;
Expand Down Expand Up @@ -1387,4 +1388,13 @@ private static void validateNonNegative(long val, String propertyName) {
"Invalid value for %s. It must be greater than or equal to 0.", propertyName));
}
}

/** Validates that a property value is greater than or equal to a minimum threshold. */
private static void validateMin(long val, long min, String propertyName) {
if (val < min) {
throw new BigQueryJdbcRuntimeException(
String.format(
"Invalid value for %s. It must be greater than or equal to %d.", propertyName, min));
}
}
}
Loading
Loading