diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index 64269c7f74be..e4ba837643f8 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; @@ -80,8 +81,8 @@ class BigQueryArrowResultSet extends BigQueryBaseResultSet { // Decoder object will be reused to avoid re-allocation and too much garbage collection. private VectorSchemaRoot vectorSchemaRoot; private VectorLoader vectorLoader; - // producer thread's reference - private final Thread ownedThread; + // producer task's reference + private final Future ownedTask; private BigQueryArrowResultSet( Schema schema, @@ -93,7 +94,7 @@ private BigQueryArrowResultSet( boolean isNested, int fromIndex, int toIndexExclusive, - Thread ownedThread, + Future ownedTask, BigQuery bigQuery, Job job) throws SQLException { @@ -105,7 +106,7 @@ private BigQueryArrowResultSet( this.fromIndex = fromIndex; this.toIndexExclusive = toIndexExclusive; this.nestedRowIndex = fromIndex - 1; - this.ownedThread = ownedThread; + this.ownedTask = ownedTask; if (!isNested && arrowSchema != null) { try { this.arrowDeserializer = new ArrowDeserializer(arrowSchema); @@ -127,10 +128,10 @@ static BigQueryArrowResultSet of( long totalRows, BigQueryStatement statement, BlockingQueue buffer, - Thread ownedThread, + Future ownedTask, BigQuery bigQuery) throws SQLException { - return of(schema, arrowSchema, totalRows, statement, buffer, ownedThread, bigQuery, null); + return of(schema, arrowSchema, totalRows, statement, buffer, ownedTask, bigQuery, null); } static BigQueryArrowResultSet of( @@ -139,7 +140,7 @@ static BigQueryArrowResultSet of( long totalRows, BigQueryStatement statement, BlockingQueue buffer, - Thread ownedThread, + Future ownedTask, BigQuery bigQuery, Job job) throws SQLException { @@ -153,7 +154,7 @@ static BigQueryArrowResultSet of( false, -1, -1, - ownedThread, + ownedTask, bigQuery, job); } @@ -165,7 +166,7 @@ static BigQueryArrowResultSet of( this.currentNestedBatch = null; this.fromIndex = 0; this.toIndexExclusive = 0; - this.ownedThread = null; + this.ownedTask = null; this.arrowDeserializer = null; this.vectorSchemaRoot = null; this.vectorLoader = null; @@ -484,9 +485,9 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp public void close() { LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this)); this.isClosed = true; - if (ownedThread != null && !ownedThread.isInterrupted()) { - // interrupt the producer thread when result set is closed - ownedThread.interrupt(); + if (ownedTask != null) { + // cancel the producer task when result set is closed + ownedTask.cancel(true); } super.close(); } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 586a5c329405..cbbf50acc392 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -348,7 +348,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount); - this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool(); + // Use a bounded cached thread pool to prevent unbounded thread creation (and OOMs) + // under heavy load, while ensuring a limit (e.g., 100) high enough to prevent deadlocks + // between interdependent producer/consumer tasks (like nextPageWorker and + // populateBufferWorker). + this.queryExecutor = BigQueryJdbcMdc.newBoundedCachedThreadPool(100); } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 32ed62d91fd6..83ed7b30c6d5 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -72,6 +72,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -944,7 +945,7 @@ public ResultSet getProcedures( Thread fetcherThread = new Thread(procedureFetcher, "getProcedures-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getProcedures"); @@ -1206,7 +1207,7 @@ public ResultSet getProcedureColumns( Thread fetcherThread = new Thread(procedureColumnFetcher, "getProcedureColumns-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getProcedureColumns for catalog: " + catalog); @@ -1877,7 +1878,7 @@ public ResultSet getTables( Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getTables"); @@ -2017,7 +2018,8 @@ public ResultSet getCatalogs() { populateQueue(catalogRows, queue, schemaFields); signalEndOfData(queue, schemaFields); - return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]); + return BigQueryJsonResultSet.of( + catalogsSchema, catalogRows.size(), queue, null, new Future[0]); } Schema defineGetCatalogsSchema() { @@ -2049,7 +2051,7 @@ public ResultSet getTableTypes() { signalEndOfData(queue, tableTypesSchema.getFields()); return BigQueryJsonResultSet.of( - tableTypesSchema, tableTypeRows.size(), queue, null, new Thread[0]); + tableTypesSchema, tableTypeRows.size(), queue, null, new Future[0]); } static Schema defineGetTableTypesSchema() { @@ -2203,7 +2205,7 @@ public ResultSet getColumns( Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getColumns"); @@ -2718,7 +2720,7 @@ public ResultSet getTypeInfo() { populateQueue(typeInfoRows, queue, schemaFields); signalEndOfData(queue, schemaFields); return BigQueryJsonResultSet.of( - typeInfoSchema, typeInfoRows.size(), queue, null, new Thread[0]); + typeInfoSchema, typeInfoRows.size(), queue, null, new Future[0]); } Schema defineGetTypeInfoSchema() { @@ -3713,7 +3715,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getSchemas"); @@ -3832,7 +3834,7 @@ public ResultSet getClientInfoProperties() { signalEndOfData(queue, resultSchemaFields); } return BigQueryJsonResultSet.of( - resultSchema, collectedResults.size(), queue, null, new Thread[0]); + resultSchema, collectedResults.size(), queue, null, new Future[0]); } Schema defineGetClientInfoPropertiesSchema() { @@ -4007,7 +4009,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct Thread fetcherThread = new Thread(functionFetcher, "getFunctions-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getFunctions"); @@ -4261,7 +4263,7 @@ public ResultSet getFunctionColumns( Thread fetcherThread = new Thread(functionColumnFetcher, "getFunctionColumns-fetcher-" + catalog); BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread)); fetcherThread.start(); LOG.info("Started background thread for getFunctionColumns for catalog: " + catalog); @@ -5264,4 +5266,88 @@ private void loadDriverVersionProperties() { throw ex; } } + + // TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures. + // This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService + // directly. + static Future[] wrapThread(final Thread thread) { + if (thread == null) { + return null; + } + return new Future[] { + new Future() { + private volatile boolean cancelled = false; + + @Override + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + if (cancelled || thread.getState() == Thread.State.TERMINATED) { + return false; + } + cancelled = true; + if (mayInterruptIfRunning) { + thread.interrupt(); + } + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return cancelled || thread.getState() == Thread.State.TERMINATED; + } + + @Override + public Object get() throws InterruptedException, CancellationException { + if (isCancelled()) { + throw new CancellationException(); + } + while (thread.getState() != Thread.State.TERMINATED) { + if (isCancelled()) { + throw new CancellationException(); + } + if (thread.getState() == Thread.State.NEW) { + Thread.sleep(50); + } else { + thread.join(50); + } + } + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) + throws InterruptedException, CancellationException, TimeoutException { + if (isCancelled()) { + throw new CancellationException(); + } + long remainingNanos = unit.toNanos(timeout); + long deadline = System.nanoTime() + remainingNanos; + while (thread.getState() != Thread.State.TERMINATED) { + if (isCancelled()) { + throw new CancellationException(); + } + if (remainingNanos <= 0) { + throw new TimeoutException(); + } + long remainingMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + if (remainingMillis <= 0) { + remainingMillis = 1; + } + long delay = Math.min(remainingMillis, 50); + if (thread.getState() == Thread.State.NEW) { + Thread.sleep(delay); + } else { + thread.join(delay); + } + remainingNanos = deadline - System.nanoTime(); + } + return null; + } + } + }; + } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java index 5f2416753c4d..61326e809d4a 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java @@ -81,13 +81,13 @@ static ExecutorService newFixedThreadPool(int nThreads) { } /** - * Creates a new cached thread pool ExecutorService that automatically propagates MDC connection - * context from the submitting thread to the executing thread. + * Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC + * connection context from the submitting thread to the executing thread. */ - static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + static ExecutorService newBoundedCachedThreadPool(int maxThreads, ThreadFactory threadFactory) { return new MdcThreadPoolExecutor( 0, - Integer.MAX_VALUE, + maxThreads, 60L, TimeUnit.SECONDS, new java.util.concurrent.SynchronousQueue<>(), @@ -95,11 +95,11 @@ static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { } /** - * Creates a new cached thread pool ExecutorService that automatically propagates MDC connection - * context from the submitting thread to the executing thread. + * Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC + * connection context from the submitting thread to the executing thread. */ - static ExecutorService newCachedThreadPool() { - return newCachedThreadPool(Executors.defaultThreadFactory()); + static ExecutorService newBoundedCachedThreadPool(int maxThreads) { + return newBoundedCachedThreadPool(maxThreads, Executors.defaultThreadFactory()); } private static class MdcThreadFactory implements ThreadFactory { diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index eeb4baf2d03e..46f70b965c39 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -29,6 +29,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; /** {@link ResultSet} Implementation for JSON datasource (Using REST APIs) */ class BigQueryJsonResultSet extends BigQueryBaseResultSet { @@ -43,7 +44,7 @@ class BigQueryJsonResultSet extends BigQueryBaseResultSet { private boolean afterLast = false; private final int fromIndex; private final int toIndexExclusive; - private final Thread[] ownedThreads; + private final Future[] ownedTasks; private BigQueryJsonResultSet( Schema schema, @@ -54,7 +55,7 @@ private BigQueryJsonResultSet( BigQueryFieldValueListWrapper cursor, int fromIndex, int toIndexExclusive, - Thread[] ownedThreads, + Future[] ownedTasks, BigQuery bigQuery, Job job) { super(bigQuery, statement, schema, isNested, job); @@ -64,7 +65,7 @@ private BigQueryJsonResultSet( this.fromIndex = fromIndex; this.toIndexExclusive = toIndexExclusive; this.nestedRowIndex = fromIndex - 1; - this.ownedThreads = ownedThreads; + this.ownedTasks = ownedTasks; } /** @@ -78,10 +79,10 @@ static BigQueryJsonResultSet of( long totalRows, BlockingQueue buffer, BigQueryStatement statement, - Thread[] ownedThreads, + Future[] ownedTasks, BigQuery bigQuery) { - return of(schema, totalRows, buffer, statement, ownedThreads, bigQuery, null); + return of(schema, totalRows, buffer, statement, ownedTasks, bigQuery, null); } static BigQueryJsonResultSet of( @@ -89,12 +90,12 @@ static BigQueryJsonResultSet of( long totalRows, BlockingQueue buffer, BigQueryStatement statement, - Thread[] ownedThreads, + Future[] ownedTasks, BigQuery bigQuery, Job job) { return new BigQueryJsonResultSet( - schema, totalRows, buffer, statement, false, null, -1, -1, ownedThreads, bigQuery, job); + schema, totalRows, buffer, statement, false, null, -1, -1, ownedTasks, bigQuery, job); } static BigQueryJsonResultSet of( @@ -102,10 +103,10 @@ static BigQueryJsonResultSet of( long totalRows, BlockingQueue buffer, BigQueryStatement statement, - Thread[] ownedThreads) { + Future[] ownedTasks) { return new BigQueryJsonResultSet( - schema, totalRows, buffer, statement, false, null, -1, -1, ownedThreads, null, null); + schema, totalRows, buffer, statement, false, null, -1, -1, ownedTasks, null, null); } BigQueryJsonResultSet() { @@ -113,7 +114,7 @@ static BigQueryJsonResultSet of( totalRows = 0; buffer = null; fromIndex = 0; - ownedThreads = new Thread[0]; + ownedTasks = new Future[0]; toIndexExclusive = 0; } @@ -291,10 +292,10 @@ private FieldValue getObjectInternal(int columnIndex) throws SQLException { public void close() { LOG.fineTrace("close", () -> String.format("Closing BigqueryJsonResultSet %s.", this)); this.isClosed = true; - if (ownedThreads != null) { - for (Thread ownedThread : ownedThreads) { - if (!ownedThread.isInterrupted()) { - ownedThread.interrupt(); + if (ownedTasks != null) { + for (Future ownedTask : ownedTasks) { + if (ownedTask != null) { + ownedTask.cancel(true); } } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java index 85a00214376f..eefdafcea84b 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizers.java @@ -19,6 +19,7 @@ import com.google.api.core.InternalApi; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; +import java.util.concurrent.Future; @InternalApi class BigQueryResultSetFinalizers { @@ -27,44 +28,44 @@ class BigQueryResultSetFinalizers { @InternalApi static class ArrowResultSetFinalizer extends PhantomReference { - Thread ownedThread; + Future ownedTask; public ArrowResultSetFinalizer( BigQueryArrowResultSet referent, ReferenceQueue q, - Thread ownedThread) { + Future ownedTask) { super(referent, q); - this.ownedThread = ownedThread; + this.ownedTask = ownedTask; } // Free resources. Remove all the hard refs public void finalizeResources() { LOG.finestTrace("finalizeResources"); - if (ownedThread != null && !ownedThread.isInterrupted()) { - ownedThread.interrupt(); + if (ownedTask != null) { + ownedTask.cancel(true); } } } @InternalApi static class JsonResultSetFinalizer extends PhantomReference { - Thread[] ownedThreads; + Future[] ownedTasks; public JsonResultSetFinalizer( BigQueryJsonResultSet referent, ReferenceQueue q, - Thread[] ownedThreads) { + Future[] ownedTasks) { super(referent, q); - this.ownedThreads = ownedThreads; + this.ownedTasks = ownedTasks; } // Free resources. Remove all the hard refs public void finalizeResources() { LOG.finestTrace("finalizeResources"); - if (ownedThreads != null) { - for (Thread ownedThread : ownedThreads) { - if (!ownedThread.isInterrupted()) { - ownedThread.interrupt(); + if (ownedTasks != null) { + for (Future ownedTask : ownedTasks) { + if (ownedTask != null) { + ownedTask.cancel(true); } } } diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 0d4d94175b8d..d2908c314d19 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -76,8 +76,9 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.logging.Level; @@ -90,10 +91,6 @@ */ public class BigQueryStatement extends BigQueryNoOpsStatement { - // TODO (obada): Update this after benchmarking - private static final int MAX_PROCESS_QUERY_THREADS_CNT = 50; - protected static ExecutorService queryTaskExecutor = - Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); public static final int DEFAULT_BUFFER_SIZE = BigQuerySettings.DEFAULT_NUM_BUFFERED_ROWS * 2; private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; @@ -813,6 +810,7 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio JobId currentJobId = results.getJobId(); TableId destinationTable = getDestinationTable(currentJobId); Schema schema = results.getSchema(); + Future populateBufferWorker = null; try { String parent = String.format("projects/%s", destinationTable.getProject()); String srcTable = @@ -836,9 +834,9 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio ReadSession readSession = getReadSession(builder.build()); this.arrowBatchWrapperBlockingQueue = new LinkedBlockingDeque<>(getBufferSize()); // deserialize and populate the buffer async, so that the client isn't blocked - Thread populateBufferWorker = + populateBufferWorker = populateArrowBufferedQueue( - readSession, this.arrowBatchWrapperBlockingQueue, this.bigQueryReadClient); + readSession, this.arrowBatchWrapperBlockingQueue, getBigQueryReadClient()); BigQueryArrowResultSet arrowResultSet = BigQueryArrowResultSet.of( @@ -857,19 +855,39 @@ ResultSet processArrowResultSet(TableResult results, Job job) throws SQLExceptio arrowResultSet.setQueryId(results.getQueryId()); return arrowResultSet; - } catch (Exception ex) { + } catch (Exception | OutOfMemoryError ex) { + if (populateBufferWorker != null) { + populateBufferWorker.cancel(true); + } + if (ex instanceof OutOfMemoryError || ex instanceof RejectedExecutionException) { + throw new BigQueryJdbcException( + "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", + ex); + } + if (ex instanceof RuntimeException) { + throw (ex instanceof BigQueryJdbcRuntimeException) + ? (BigQueryJdbcRuntimeException) ex + : new BigQueryJdbcRuntimeException(ex); + } + if (ex instanceof SQLException) { + throw (ex instanceof BigQueryJdbcException) + ? (BigQueryJdbcException) ex + : new BigQueryJdbcException(ex); + } throw new BigQueryJdbcException(ex.getMessage(), ex); } } /** Asynchronously reads results and populates an arrow record queue */ @InternalApi - Thread populateArrowBufferedQueue( + Future populateArrowBufferedQueue( ReadSession readSession, BlockingQueue arrowBatchWrapperBlockingQueue, BigQueryReadClient bqReadClient) { LOG.finer("++enter++"); + ExecutorService executor = connection.getExecutorService(); + Runnable arrowStreamProcessor = () -> { long rowsRead = 0; @@ -890,7 +908,7 @@ Thread populateArrowBufferedQueue( com.google.api.gax.rpc.ServerStream stream = bqReadClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() || executor.isShutdown()) { break; } @@ -952,9 +970,7 @@ Thread populateArrowBufferedQueue( } }; - Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); - populateBufferWorker.start(); - return populateBufferWorker; + return executor.submit(arrowStreamProcessor); } /** Executes SQL query using either fast query path or read API */ @@ -1038,8 +1054,8 @@ private boolean meetsReadRatio(TableResult results) { return totalRows / pageSize > querySettings.getHighThroughputActivationRatio(); } - BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { - List threadList = new ArrayList(); + BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) throws SQLException { + List> taskList = new ArrayList<>(); Schema schema = results.getSchema(); long totalRows = (getMaxRows() > 0) ? getMaxRows() : results.getTotalRows(); @@ -1048,34 +1064,60 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results, Job job) { new LinkedBlockingDeque<>(getPageCacheSize(getBufferSize(), schema)); JobId jobId = results.getJobId(); - if (jobId != null) { - // Thread to make rpc calls to fetch data from the server - Thread nextPageWorker = - runNextPageTaskAsync( - results, - results.getNextPageToken(), - jobId, - rpcResponseQueue, - this.bigQueryFieldValueListWrapperBlockingQueue); - threadList.add(nextPageWorker); - } else { - try { - populateFirstPage(results, rpcResponseQueue); - rpcResponseQueue.put(Tuple.of(null, false)); - } catch (InterruptedException e) { - LOG.warning( - "%s Interrupted @ processJsonQueryResponseResults: %s", - Thread.currentThread().getName(), e.getMessage()); + try { + if (jobId != null) { + // Task to make rpc calls to fetch data from the server + Future nextPageWorker = + runNextPageTaskAsync( + results, + results.getNextPageToken(), + jobId, + rpcResponseQueue, + this.bigQueryFieldValueListWrapperBlockingQueue); + taskList.add(nextPageWorker); + } else { + try { + populateFirstPage(results, rpcResponseQueue); + rpcResponseQueue.put(Tuple.of(null, false)); + } catch (InterruptedException e) { + LOG.warning( + "%s Interrupted @ processJsonQueryResponseResults: %s", + Thread.currentThread().getName(), e.getMessage()); + Thread.currentThread().interrupt(); + throw new BigQueryJdbcException("Query execution was interrupted.", e); + } } - } - // Thread to parse data received from the server to client library objects - Thread populateBufferWorker = - parseAndPopulateRpcDataAsync( - schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); - threadList.add(populateBufferWorker); + // Task to parse data received from the server to client library objects + Future populateBufferWorker = + parseAndPopulateRpcDataAsync( + schema, this.bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue); + taskList.add(populateBufferWorker); + } catch (Exception | OutOfMemoryError e) { + for (Future task : taskList) { + if (task != null) { + task.cancel(true); + } + } + if (e instanceof RejectedExecutionException || e instanceof OutOfMemoryError) { + throw new BigQueryJdbcException( + "Failed to execute query: Unable to allocate background threads to process the query results. Connection-scoped thread pool limit of 100 threads was reached or system is out of memory.", + e); + } + if (e instanceof RuntimeException) { + throw (e instanceof BigQueryJdbcRuntimeException) + ? (BigQueryJdbcRuntimeException) e + : new BigQueryJdbcRuntimeException(e); + } + if (e instanceof SQLException) { + throw (e instanceof BigQueryJdbcException) + ? (BigQueryJdbcException) e + : new BigQueryJdbcException(e); + } + throw new BigQueryJdbcException(e.getMessage(), e); + } - Thread[] jsonWorkers = threadList.toArray(new Thread[0]); + Future[] jsonWorkers = taskList.toArray(new Future[0]); BigQueryJsonResultSet jsonResultSet = BigQueryJsonResultSet.of( @@ -1118,7 +1160,7 @@ public void setFetchDirection(int direction) throws SQLException { } @VisibleForTesting - Thread runNextPageTaskAsync( + Future runNextPageTaskAsync( TableResult result, String firstPageToken, JobId jobId, @@ -1129,6 +1171,8 @@ Thread runNextPageTaskAsync( // calls populateFirstPage(result, rpcResponseQueue); + ExecutorService executor = connection.getExecutorService(); + // This thread makes the RPC calls and paginates Runnable nextPageTask = () -> { @@ -1142,7 +1186,7 @@ Thread runNextPageTaskAsync( try { while (currentPageToken != null) { // do not process further pages and shutdown - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() || executor.isShutdown()) { LOG.warning( "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); break; @@ -1177,9 +1221,7 @@ Thread runNextPageTaskAsync( // have finished processing the records and even that will be interrupted }; - Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask); - nextPageWorker.start(); - return nextPageWorker; + return executor.submit(nextPageTask); } /** @@ -1187,12 +1229,14 @@ Thread runNextPageTaskAsync( * bigQueryFieldValueListWrapperBlockingQueue with FieldValueList */ @VisibleForTesting - Thread parseAndPopulateRpcDataAsync( + Future parseAndPopulateRpcDataAsync( Schema schema, BlockingQueue bigQueryFieldValueListWrapperBlockingQueue, BlockingQueue> rpcResponseQueue) { LOG.finer("++enter++"); + ExecutorService executor = connection.getExecutorService(); + Runnable populateBufferRunnable = () -> { // producer thread populating the buffer try { @@ -1217,7 +1261,7 @@ Thread parseAndPopulateRpcDataAsync( } if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() + || executor.isShutdown() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) break; @@ -1227,7 +1271,7 @@ Thread parseAndPopulateRpcDataAsync( long results = 0; for (FieldValueList fieldValueList : fieldValueLists) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() || executor.isShutdown()) { // do not process further pages and shutdown (inner loop) break; } @@ -1262,9 +1306,7 @@ Thread parseAndPopulateRpcDataAsync( } }; - Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable); - populateBufferWorker.start(); - return populateBufferWorker; + return executor.submit(populateBufferRunnable); } /** diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java index 93eb573f3c35..1ffd05ff4cd6 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSetTest.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.TimeZone; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.stream.Stream; import org.apache.arrow.memory.RootAllocator; @@ -237,10 +238,10 @@ public void setUp() throws SQLException, IOException { ArrowSchema.newBuilder() .setSerializedSchema(serializeSchema(vectorSchemaRoot.getSchema())) .build(); - Thread workerThread = new Thread(); + Future workerTask = mock(Future.class); bigQueryArrowResultSet = BigQueryArrowResultSet.of( - QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerThread, null); + QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerTask, null); // nested result set data setup JsonStringArrayList jsonStringArrayList = getJsonStringArrayList(); @@ -275,17 +276,17 @@ public void testRowCount() throws SQLException, IOException { ArrowSchema.newBuilder() .setSerializedSchema(serializeSchema(vectorSchemaRoot.getSchema())) .build(); - Thread workerThread = new Thread(); + Future workerTask = mock(Future.class); // ResultSet with 1 row buffer and 1 total rows. BigQueryArrowResultSet bigQueryArrowResultSet2 = BigQueryArrowResultSet.of( - QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerThread, null); + QUERY_SCHEMA, arrowSchema, 1, statement, buffer, workerTask, null); assertThat(resultSetRowCount(bigQueryArrowResultSet2)).isEqualTo(1); // ResultSet with 2 rows buffer and 1 total rows. bigQueryArrowResultSet2 = BigQueryArrowResultSet.of( - QUERY_SCHEMA, arrowSchema, 1, statement, bufferWithTwoRows, workerThread, null); + QUERY_SCHEMA, arrowSchema, 1, statement, bufferWithTwoRows, workerTask, null); assertThat(resultSetRowCount(bigQueryArrowResultSet2)).isEqualTo(1); } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 58a5a7212066..9b2b82644c35 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -44,9 +44,13 @@ import java.sql.Types; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -3308,4 +3312,125 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy assertEquals( metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type); } + + @Test + public void testWrapThread_NullThread() { + assertNull(BigQueryDatabaseMetaData.wrapThread(null)); + } + + @Test + public void testWrapThread_BasicLifecycle() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + try { + startLatch.countDown(); + finishLatch.await(); + } catch (InterruptedException e) { + // ignore + } + }); + + Future[] futures = BigQueryDatabaseMetaData.wrapThread(t); + assertNotNull(futures); + assertEquals(1, futures.length); + Future f = futures[0]; + + // Thread is NEW (not started yet). + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + + t.start(); + startLatch.await(); + + // Thread is running. + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + + finishLatch.countDown(); + t.join(); + + // Thread is terminated. + assertTrue(f.isDone()); + assertFalse(f.isCancelled()); + assertNull(f.get()); + } + + @Test + public void testWrapThread_CancelBeforeStart() throws Exception { + Thread t = + new Thread( + () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + }); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertTrue(f.cancel(true)); + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + + // cancel on already cancelled should return false + assertFalse(f.cancel(true)); + + assertThrows(CancellationException.class, () -> f.get()); + assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS)); + } + + @Test + public void testWrapThread_CancelRunningWithInterrupt() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch interruptedLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + startLatch.countDown(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + interruptedLatch.countDown(); + } + }); + + t.start(); + startLatch.await(); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertTrue(f.cancel(true)); + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + + assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS)); + assertThrows(CancellationException.class, () -> f.get()); + } + + @Test + public void testWrapThread_GetTimeout() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + startLatch.countDown(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // ignore + } + }); + + t.start(); + startLatch.await(); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS)); + + // Cleanup: stop the thread + t.interrupt(); + t.join(); + } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java index 387e769bae0a..9248fa1578b9 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java @@ -320,9 +320,9 @@ public void testConnectionScopedExecutorLifecycle() throws Exception { assertTrue(metadataExec2 instanceof ThreadPoolExecutor); assertEquals(0, ((ThreadPoolExecutor) exec1).getCorePoolSize()); - assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec1).getMaximumPoolSize()); + assertEquals(100, ((ThreadPoolExecutor) exec1).getMaximumPoolSize()); assertEquals(0, ((ThreadPoolExecutor) exec2).getCorePoolSize()); - assertEquals(Integer.MAX_VALUE, ((ThreadPoolExecutor) exec2).getMaximumPoolSize()); + assertEquals(100, ((ThreadPoolExecutor) exec2).getMaximumPoolSize()); assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getCorePoolSize()); assertEquals(5, ((ThreadPoolExecutor) metadataExec1).getMaximumPoolSize()); assertTrue(((ThreadPoolExecutor) metadataExec1).allowsCoreThreadTimeOut()); diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java index 1e9a3830cd90..b75f8493be80 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSetTest.java @@ -53,6 +53,7 @@ import java.time.LocalTime; import java.util.TimeZone; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -168,9 +169,9 @@ public void setUp() { statement = mock(BigQueryStatement.class); buffer.add(BigQueryFieldValueListWrapper.of(fieldList, fieldValues)); buffer.add(BigQueryFieldValueListWrapper.of(null, null, true)); // last marker - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; bigQueryJsonResultSet = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerTasks); // Buffer with 2 rows. bufferWithTwoRows = new LinkedBlockingDeque<>(3); @@ -196,9 +197,9 @@ public void setUp() { private boolean resetResultSet() throws SQLException { // re-initialises the resultset and moves the cursor to the first row - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; bigQueryJsonResultSet = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerTasks); return bigQueryJsonResultSet.next(); // move to the first row } @@ -214,15 +215,15 @@ public void testClose() { @Test public void testRowCount() throws SQLException { - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; // ResultSet with 1 row buffer and 1 total rows. BigQueryJsonResultSet bigQueryJsonResultSet2 = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, buffer, statement, workerTasks); assertThat(resultSetRowCount(bigQueryJsonResultSet2)).isEqualTo(1); // ResultSet with 2 rows buffer and 1 total rows. bigQueryJsonResultSet2 = BigQueryJsonResultSet.of( - QUERY_SCHEMA, 1L, bufferWithTwoRows, statementForTwoRows, workerThreads); + QUERY_SCHEMA, 1L, bufferWithTwoRows, statementForTwoRows, workerTasks); assertThat(resultSetRowCount(bigQueryJsonResultSet2)).isEqualTo(1); } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java index ee4d6047b931..460e5b68c3c5 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetFinalizersTest.java @@ -16,52 +16,33 @@ package com.google.cloud.bigquery.jdbc; -import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import java.util.concurrent.Future; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class BigQueryResultSetFinalizersTest { - Thread arrowWorker; - Thread[] jsonWorkers; + Future[] jsonWorkers; @BeforeEach public void setUp() { - // create and start the demon threads - arrowWorker = - new Thread( - () -> { - while (true) { - if (Thread.currentThread().isInterrupted()) { - break; - } - } - }); - arrowWorker.setDaemon(true); - Thread jsonWorker = - new Thread( - () -> { - while (true) { - if (Thread.currentThread().isInterrupted()) { - break; - } - } - }); - jsonWorker.setDaemon(true); - jsonWorkers = new Thread[] {jsonWorker}; - arrowWorker.start(); - jsonWorker.start(); + Future mockFuture = mock(Future.class); + jsonWorkers = new Future[] {mockFuture}; } @Test public void testFinalizeResources() { + Future mockFuture = mock(Future.class); BigQueryResultSetFinalizers.ArrowResultSetFinalizer arrowResultSetFinalizer = - new BigQueryResultSetFinalizers.ArrowResultSetFinalizer(null, null, arrowWorker); + new BigQueryResultSetFinalizers.ArrowResultSetFinalizer(null, null, mockFuture); arrowResultSetFinalizer.finalizeResources(); - assertThat(arrowWorker.isInterrupted()).isTrue(); + verify(mockFuture).cancel(true); + BigQueryResultSetFinalizers.JsonResultSetFinalizer jsonResultSetFinalizer = new BigQueryResultSetFinalizers.JsonResultSetFinalizer(null, null, jsonWorkers); jsonResultSetFinalizer.finalizeResources(); - assertThat(jsonWorkers[0].isInterrupted()).isTrue(); + verify(jsonWorkers[0]).cancel(true); } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java index b95bb0e056b5..8261a14dc981 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java @@ -31,6 +31,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.concurrent.Future; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -84,9 +85,9 @@ public class BigQueryResultSetMetadataTest { @BeforeEach public void setUp() throws SQLException { statement = mock(BigQueryStatement.class); - Thread[] workerThreads = {new Thread()}; + Future[] workerTasks = {mock(Future.class)}; BigQueryJsonResultSet bigQueryJsonResultSet = - BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, null, statement, workerThreads); + BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, null, statement, workerTasks); // values for nested types resultSetMetaData = bigQueryJsonResultSet.getMetaData(); @@ -290,7 +291,7 @@ public void testIsSearchableForAllTypes(StandardSQLTypeName type) throws SQLExce FieldList schemaFields = FieldList.of(field); BigQueryJsonResultSet resultSet = BigQueryJsonResultSet.of( - Schema.of(schemaFields), 1L, null, statement, new Thread[] {new Thread()}); + Schema.of(schemaFields), 1L, null, statement, new Future[] {mock(Future.class)}); ResultSetMetaData metaData = resultSet.getMetaData(); assertThat(metaData.isSearchable(1)).isTrue(); } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java index 674eb0df64e0..189aeedfaed0 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java @@ -67,6 +67,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; @@ -150,6 +152,8 @@ public void setUp() throws IOException, SQLException { .when(bigQueryConnection) .getQueryDialect(); doReturn(1000L).when(bigQueryConnection).getMaxResults(); + ExecutorService executorService = mock(ExecutorService.class); + doReturn(executorService).when(bigQueryConnection).getExecutorService(); bigQueryStatement = new BigQueryStatement(bigQueryConnection); VectorSchemaRoot vectorSchemaRoot = getTestVectorSchemaRoot(); arrowSchema = @@ -252,7 +256,7 @@ public void getArrowResultSetTest() throws SQLException { doReturn(readSession) .when(bigQueryStatementSpy) .getReadSession(any(CreateReadSessionRequest.class)); - Thread mockWorker = new Thread(); + Future mockWorker = mock(Future.class); doReturn(mockWorker) .when(bigQueryStatementSpy) .populateArrowBufferedQueue( diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java index fda5112703fd..db77acc58342 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/PerConnectionFileHandlerTest.java @@ -31,6 +31,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Optional; +import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.LogRecord; import org.junit.jupiter.api.AfterEach; @@ -195,7 +196,8 @@ public void testResultSetExceptionLogRouting() throws Exception { // Instantiate a real BigQueryJsonResultSet (which extends BigQueryBaseResultSet) // passing the mock statement carrying connectionId "c789" - BigQueryJsonResultSet rs = BigQueryJsonResultSet.of(schema, 0, null, mockStmt, new Thread[0]); + BigQueryJsonResultSet rs = + BigQueryJsonResultSet.of(schema, 0, null, mockStmt, new Future[0]); // Calling findColumn(null) throws SQLException because column label is null assertThrows(SQLException.class, () -> rs.findColumn(null));