From 847c892247454b1827829859e8983f074d2fc7da Mon Sep 17 00:00:00 2001 From: Blake Li Date: Tue, 2 Jun 2026 22:14:46 +0000 Subject: [PATCH 1/2] feat(library_generation): generic grouping folder for unversioned libraries staging paths --- .../templates/owlbot.yaml.monorepo.j2 | 13 +++++-- ...Bot-hermetic-unversioned-gapic-golden.yaml | 35 +++++++++++++++++ ...ermetic-unversioned-proto-only-golden.yaml | 29 ++++++++++++++ .../tests/utilities_unit_tests.py | 38 ++++++++++++++++++- .../library_generation/utils/utilities.py | 4 ++ 5 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-gapic-golden.yaml create mode 100644 sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-proto-only-golden.yaml diff --git a/sdk-platform-java/hermetic_build/library_generation/templates/owlbot.yaml.monorepo.j2 b/sdk-platform-java/hermetic_build/library_generation/templates/owlbot.yaml.monorepo.j2 index d3f29de32a7e..5d7fcf8ff34c 100644 --- a/sdk-platform-java/hermetic_build/library_generation/templates/owlbot.yaml.monorepo.j2 +++ b/sdk-platform-java/hermetic_build/library_generation/templates/owlbot.yaml.monorepo.j2 @@ -33,14 +33,19 @@ deep-copy-regex: - source: "/{{ proto_path }}/(v.*)/.*-java/samples/snippets/generated" dest: "/owl-bot-staging/{{ module_name }}/$1/samples/snippets/generated" {%- else %} +{%- if proto_only %} - source: "/{{ proto_path }}/.*-java/proto-google-.*/src" - dest: "/owl-bot-staging/{{ module_name }}/proto-{{ artifact_id }}/src" + dest: "/owl-bot-staging/{{ module_name }}/{{ unversioned_dir }}/{{ artifact_id }}/src" +{%- else %} +- source: "/{{ proto_path }}/.*-java/proto-google-.*/src" + dest: "/owl-bot-staging/{{ module_name }}/{{ unversioned_dir }}/proto-{{ artifact_id }}/src" - source: "/{{ proto_path }}/.*-java/grpc-google-.*/src" - dest: "/owl-bot-staging/{{ module_name }}/grpc-{{ artifact_id }}/src" + dest: "/owl-bot-staging/{{ module_name }}/{{ unversioned_dir }}/grpc-{{ artifact_id }}/src" - source: "/{{ proto_path }}/.*-java/gapic-google-.*/src" - dest: "/owl-bot-staging/{{ module_name }}/{{ artifact_id }}/src" + dest: "/owl-bot-staging/{{ module_name }}/{{ unversioned_dir }}/{{ artifact_id }}/src" - source: "/{{ proto_path }}/.*-java/samples/snippets/generated" - dest: "/owl-bot-staging/{{ module_name }}/samples/snippets/generated" + dest: "/owl-bot-staging/{{ module_name }}/{{ unversioned_dir }}/samples/snippets/generated" +{%- endif %} {%- endif %} {%- endif %} diff --git a/sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-gapic-golden.yaml b/sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-gapic-golden.yaml new file mode 100644 index 000000000000..33dcf88e4e32 --- /dev/null +++ b/sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-gapic-golden.yaml @@ -0,0 +1,35 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +deep-remove-regex: +- "/java-bare-metal-solution/grpc-google-.*/src" +- "/java-bare-metal-solution/proto-google-.*/src" +- "/java-bare-metal-solution/google-.*/src" +- "/java-bare-metal-solution/samples/snippets/generated" + +deep-preserve-regex: +- "/java-bare-metal-solution/google-.*/src/test/java/com/google/cloud/.*/v.*/it/IT.*Test.java" + +deep-copy-regex: +- source: "/google/cloud/baremetalsolution/.*-java/proto-google-.*/src" + dest: "/owl-bot-staging/java-bare-metal-solution/baremetalsolution/proto-google-cloud-bare-metal-solution/src" +- source: "/google/cloud/baremetalsolution/.*-java/grpc-google-.*/src" + dest: "/owl-bot-staging/java-bare-metal-solution/baremetalsolution/grpc-google-cloud-bare-metal-solution/src" +- source: "/google/cloud/baremetalsolution/.*-java/gapic-google-.*/src" + dest: "/owl-bot-staging/java-bare-metal-solution/baremetalsolution/google-cloud-bare-metal-solution/src" +- source: "/google/cloud/baremetalsolution/.*-java/samples/snippets/generated" + dest: "/owl-bot-staging/java-bare-metal-solution/baremetalsolution/samples/snippets/generated" + +api-name: baremetalsolution \ No newline at end of file diff --git a/sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-proto-only-golden.yaml b/sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-proto-only-golden.yaml new file mode 100644 index 000000000000..a93303a46d7d --- /dev/null +++ b/sdk-platform-java/hermetic_build/library_generation/tests/resources/goldens/.OwlBot-hermetic-unversioned-proto-only-golden.yaml @@ -0,0 +1,29 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +deep-remove-regex: +- "/java-bare-metal-solution/grpc-google-.*/src" +- "/java-bare-metal-solution/proto-google-.*/src" +- "/java-bare-metal-solution/google-.*/src" +- "/java-bare-metal-solution/samples/snippets/generated" + +deep-preserve-regex: +- "/java-bare-metal-solution/google-.*/src/test/java/com/google/cloud/.*/v.*/it/IT.*Test.java" + +deep-copy-regex: +- source: "/google/cloud/baremetalsolution/.*-java/proto-google-.*/src" + dest: "/owl-bot-staging/java-bare-metal-solution/baremetalsolution/google-cloud-bare-metal-solution/src" + +api-name: baremetalsolution \ No newline at end of file diff --git a/sdk-platform-java/hermetic_build/library_generation/tests/utilities_unit_tests.py b/sdk-platform-java/hermetic_build/library_generation/tests/utilities_unit_tests.py index 0708fdb4c556..72ccaa89c544 100644 --- a/sdk-platform-java/hermetic_build/library_generation/tests/utilities_unit_tests.py +++ b/sdk-platform-java/hermetic_build/library_generation/tests/utilities_unit_tests.py @@ -253,6 +253,38 @@ def test_generate_postprocessing_prerequisite_files_proto_only_repo_success(self ) self.__remove_postprocessing_prerequisite_files(path=library_path) + def test_generate_postprocessing_prerequisite_files_unversioned_proto_only_success(self): + self.maxDiff = None + library_path = self.__setup_postprocessing_prerequisite_files( + combination=3, + library_type="OTHER", + proto_path="google/cloud/baremetalsolution", + has_version=False, + proto_only=True, + ) + + file_comparator.compare_files( + f"{library_path}/.OwlBot-hermetic.yaml", + f"{library_path}/.OwlBot-hermetic-unversioned-proto-only-golden.yaml", + ) + self.__remove_postprocessing_prerequisite_files(path=library_path) + + def test_generate_postprocessing_prerequisite_files_unversioned_gapic_success(self): + self.maxDiff = None + library_path = self.__setup_postprocessing_prerequisite_files( + combination=2, + library_type="GAPIC_AUTO", + proto_path="google/cloud/baremetalsolution", + has_version=False, + proto_only=False, + ) + + file_comparator.compare_files( + f"{library_path}/.OwlBot-hermetic.yaml", + f"{library_path}/.OwlBot-hermetic-unversioned-gapic-golden.yaml", + ) + self.__remove_postprocessing_prerequisite_files(path=library_path) + def test_generate_postprocessing_prerequisite_files__custom_transport_set_in_config__success( self, ): @@ -326,6 +358,9 @@ def __setup_postprocessing_prerequisite_files( combination: int, library_type: str = "GAPIC_AUTO", library: LibraryConfig = library_1, + proto_path: str = "google/cloud/baremetalsolution/v2", + has_version: bool = True, + proto_only: bool = False, ) -> str: library_path = f"{resources_dir}/goldens" files = [ @@ -336,7 +371,6 @@ def __setup_postprocessing_prerequisite_files( cleanup(files) library.library_type = library_type config = self.__get_a_gen_config(combination, library_type=library_type) - proto_path = "google/cloud/baremetalsolution/v2" gapic_inputs = GapicInputs() # defaults to transport=grpc transport = library.get_transport(gapic_inputs) util.generate_postprocessing_prerequisite_files( @@ -345,6 +379,8 @@ def __setup_postprocessing_prerequisite_files( proto_path=proto_path, transport=transport, library_path=library_path, + has_version=has_version, + proto_only=proto_only, ) return library_path diff --git a/sdk-platform-java/hermetic_build/library_generation/utils/utilities.py b/sdk-platform-java/hermetic_build/library_generation/utils/utilities.py index 23fd92458919..9e9edc00d7b1 100755 --- a/sdk-platform-java/hermetic_build/library_generation/utils/utilities.py +++ b/sdk-platform-java/hermetic_build/library_generation/utils/utilities.py @@ -206,6 +206,7 @@ def generate_postprocessing_prerequisite_files( library_path: str, language: str = "java", has_version: bool = True, + proto_only: bool = False, ) -> None: """ Generates the postprocessing prerequisite files for a library. @@ -301,6 +302,7 @@ def generate_postprocessing_prerequisite_files( else f"{library_path}/.github/{owlbot_yaml_file}" ) if not os.path.exists(path_to_owlbot_yaml_file): + unversioned_dir = remove_version_from(proto_path).split("/")[-1] render( template_name="owlbot.yaml.monorepo.j2", output_name=path_to_owlbot_yaml_file, @@ -309,6 +311,8 @@ def generate_postprocessing_prerequisite_files( module_name=repo_metadata["repo_short"], api_shortname=library.api_shortname, has_version=has_version, + proto_only=proto_only, + unversioned_dir=unversioned_dir, ) # generate owlbot.py From c52c30cac4965eff28db4d39aab7a67dc8037bf4 Mon Sep 17 00:00:00 2001 From: Blake Li Date: Thu, 11 Jun 2026 05:22:22 +0000 Subject: [PATCH 2/2] feat(gax): implement Scotty Resumable Upload protocol --- sdk-platform-java/gax-java/SCOTTY_DESIGN.md | 252 +++++++++ .../gax/httpjson/HttpJsonCallableFactory.java | 14 + .../httpjson/HttpJsonResumableUploadCall.java | 512 ++++++++++++++++++ .../HttpJsonResumableUploadCallable.java | 103 ++++ .../gax/httpjson/ManagedHttpJsonChannel.java | 4 + .../HttpJsonResumableUploadCallableTest.java | 345 ++++++++++++ .../api/gax/rpc/InputStreamProvider.java | 50 ++ .../api/gax/rpc/ResumableUploadCallable.java | 87 +++ .../rpc/ResumableUploadProgressListener.java | 55 ++ .../api/gax/rpc/ResumableUploadRequest.java | 114 ++++ .../api/gax/rpc/ResumableUploadStatus.java | 74 +++ 11 files changed, 1610 insertions(+) create mode 100644 sdk-platform-java/gax-java/SCOTTY_DESIGN.md create mode 100644 sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCall.java create mode 100644 sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallable.java create mode 100644 sdk-platform-java/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallableTest.java create mode 100644 sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/InputStreamProvider.java create mode 100644 sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadCallable.java create mode 100644 sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadProgressListener.java create mode 100644 sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadRequest.java create mode 100644 sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadStatus.java diff --git a/sdk-platform-java/gax-java/SCOTTY_DESIGN.md b/sdk-platform-java/gax-java/SCOTTY_DESIGN.md new file mode 100644 index 000000000000..b72e4451a965 --- /dev/null +++ b/sdk-platform-java/gax-java/SCOTTY_DESIGN.md @@ -0,0 +1,252 @@ +# Resumable Upload Protocol (Scotty) for Java: Design Document + +This document proposes the architecture and design for integrating the Scotty Resumable Upload Protocol (RUP) into `gax-java` and the GAPIC code generator. + +--- + +## 1. Design Principles and Requirements + +1. **Veneer and GAPIC Aligned**: The solution must integrate cleanly into the existing `Callable` framework of `gax-java`. +2. **Stream-Safe Retries**: Java `InputStream` is forward-only. The design must provide a clean abstraction (`InputStreamProvider`) to recreate or seek the stream during recovery. +3. **Double-Loop Retry & Recovery**: Implements the precise Category 1 (transient) and Category 2 (state consistency) error classification with backoffs as described in the RUP specifications. +4. **Progress Reporting**: Supports asynchronous progress updates via a simple callback mechanism. +5. **No unnecessary chunking**: By default, uploads send the remaining bytes in one request (avoiding unnecessary memory buffering or chunk management). + +--- + +## 2. API Design (`gax` Changes) + +We introduce a new callable type and request/response wrappers in `com.google.api.gax.rpc`. + +### 2.1. `InputStreamProvider` + +To support seeking/rewinding, the stream source is wrapped in a functional interface that can supply fresh streams on retry: + +```java +package com.google.api.gax.rpc; + +import java.io.IOException; +import java.io.InputStream; + +/** Provides a fresh {@link InputStream} for retriable upload operations. */ +@FunctionalInterface +public interface InputStreamProvider { + /** Returns a new {@link InputStream}. */ + InputStream get() throws IOException; +} +``` + +### 2.2. Progress Listener and Status + +```java +package com.google.api.gax.rpc; + +/** Listener for tracking progress of a resumable upload. */ +public interface ResumableUploadProgressListener { + + enum State { + NOT_STARTED, + IN_PROGRESS, + RECOVERING, + COMPLETED, + FAILED, + CANCELLED + } + + void onProgress(ResumableUploadStatus status); +} + +/** Status details for progress updates. */ +public final class ResumableUploadStatus { + private final long bytesUploaded; + private final long totalBytes; + private final ResumableUploadProgressListener.State state; + + public ResumableUploadStatus(long bytesUploaded, long totalBytes, ResumableUploadProgressListener.State state) { + this.bytesUploaded = bytesUploaded; + this.totalBytes = totalBytes; + this.state = state; + } + + public long getBytesUploaded() { return bytesUploaded; } + public long getTotalBytes() { return totalBytes; } + public ResumableUploadProgressListener.State getState() { return state; } +} +``` + +### 2.3. Request Wrapper: `ResumableUploadRequest` + +```java +package com.google.api.gax.rpc; + +import com.google.common.base.Preconditions; + +public final class ResumableUploadRequest { + private final RequestT request; + private final InputStreamProvider streamProvider; + private final long totalBytes; // -1 if unknown + private final ResumableUploadProgressListener progressListener; + + private ResumableUploadRequest(Builder builder) { + this.request = Preconditions.checkNotNull(builder.request); + this.streamProvider = Preconditions.checkNotNull(builder.streamProvider); + this.totalBytes = builder.totalBytes; + this.progressListener = builder.progressListener; + } + + public RequestT getRequest() { return request; } + public InputStreamProvider getStreamProvider() { return streamProvider; } + public long getTotalBytes() { return totalBytes; } + public ResumableUploadProgressListener getProgressListener() { return progressListener; } + + public static Builder newBuilder() { + return new Builder<>(); + } + + public static class Builder { + private RequestT request; + private InputStreamProvider streamProvider; + private long totalBytes = -1; + private ResumableUploadProgressListener progressListener; + + public Builder setRequest(RequestT request) { + this.request = request; + return this; + } + public Builder setStreamProvider(InputStreamProvider streamProvider) { + this.streamProvider = streamProvider; + return this; + } + public Builder setTotalBytes(long totalBytes) { + this.totalBytes = totalBytes; + return this; + } + public Builder setProgressListener(ResumableUploadProgressListener progressListener) { + this.progressListener = progressListener; + return this; + } + public ResumableUploadRequest build() { + return new ResumableUploadRequest<>(this); + } + } +} +``` + +### 2.4. Callable Wrapper: `ResumableUploadCallable` + +```java +package com.google.api.gax.rpc; + +import com.google.api.core.ApiFuture; + +public abstract class ResumableUploadCallable { + + protected ResumableUploadCallable() {} + + public abstract ApiFuture futureCall( + ResumableUploadRequest request, ApiCallContext context); + + public ResponseT call(ResumableUploadRequest request, ApiCallContext context) { + return ApiExceptions.callAndTranslateCharSequenceException(futureCall(request, context)); + } + + public ResponseT call(ResumableUploadRequest request) { + return call(request, null); + } +} +``` + +--- + +## 3. Transport Implementation (`gax-httpjson`) + +The transport layer executes the actual HTTP protocol calls using the Google HTTP Client. + +We introduce `HttpJsonResumableUploadCall` to coordinate the Scotty state machine. + +### 3.1. Error Categorization in Java + +```java +private enum ErrorCategory { + CATEGORY_1_TRANSIENT, // 429, 500, 502, 503, 504, TCP/Socket Timeout + CATEGORY_2_MISMATCH, // 400, 412, 416 + CATEGORY_3_FATAL // 401, 403, 404, etc. +} + +private ErrorCategory getErrorCategory(Throwable t) { + if (t instanceof HttpResponseException) { + int statusCode = ((HttpResponseException) t).getStatusCode(); + if (statusCode == 429 || statusCode >= 500) { + return ErrorCategory.CATEGORY_1_TRANSIENT; + } + if (statusCode == 400 || statusCode == 412 || statusCode == 416) { + return ErrorCategory.CATEGORY_2_MISMATCH; + } + } + if (t instanceof IOException) { + // Socket timeouts, connection drops + return ErrorCategory.CATEGORY_1_TRANSIENT; + } + return ErrorCategory.CATEGORY_3_FATAL; +} +``` + +### 3.2. Detailed Execution Flow (State Machine) + +The `HttpJsonResumableUploadCall` runs inside the user's thread (or client executor pool for future execution) and implements the following flow: + +```mermaid +stateDiagram-v2 + [*] --> StartSession + StartSession --> UploadLoop : Success (200 OK + active) + StartSession --> StartSession : Cat 1 Transient (Backoff) + StartSession --> [*] : Cat 3 Fatal / Deadline Exceeded + + state UploadLoop { + [*] --> OpenStream + OpenStream --> SkipToOffset + SkipToOffset --> TransmitChunk + TransmitChunk --> [*] : Success (final) + TransmitChunk --> QueryState : Cat 2 Mismatch / Socket Drop + TransmitChunk --> TransmitChunk : Cat 1 Transient (Backoff) + } + + UploadLoop --> [*] : Success + UploadLoop --> [*] : Cat 3 Fatal / Deadline Exceeded + + state QueryState { + [*] --> SendQuery + SendQuery --> ResumeUpload : Success (active + new offset) + SendQuery --> [*] : Success (final) + SendQuery --> SendQuery : Cat 1 Transient (Backoff) + SendQuery --> [*] : Cat 3 Fatal + } + + QueryState --> UploadLoop : Resume +``` + +#### Step 1: Start Session +- Build standard headers + merge user-provided metadata. +- Pre-emptively prefix headers that affect physical bodies (`Content-Length`, `Content-Type`, etc.) with `X-Goog-Upload-Header-`. +- Set `X-Goog-Upload-Protocol: resumable` and `X-Goog-Upload-Command: start`. +- Execute POST with the request JSON body. +- Extract `X-Goog-Upload-URL` header value to obtain the `uploadUrl`. + +#### Step 2: Upload Loop (Transmit) +- Check absolute global deadline. +- Call `streamProvider.get()`. +- Skip/seek to current `offset`. +- Set `X-Goog-Upload-Command: upload, finalize` and `X-Goog-Upload-Offset: offset`. +- Stream payload using a chunked output stream, updating the progress listener during writes. +- If response is `final` with `2xx`: parse response and return. +- If exception occurs: Categorize exception. If Category 2 (Mismatch) or connection drop, transition to **Query State**. + +#### Step 3: Query State +- Execute POST to `uploadUrl` with `X-Goog-Upload-Command: query`. +- If response is `active`: + - Extract `X-Goog-Upload-Size-Received` -> `newOffset`. + - If `newOffset == offset`: apply backoff (to avoid spamming server). + - Update `offset = newOffset` and transition back to **Upload Loop**. +- If response is `final`: return response. +- If Category 1 (Transient) error: retry query with backoff. +- If Category 3 (Fatal) error: fail immediately. diff --git a/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java index 447fc46dd9e0..72144f7a2b6c 100644 --- a/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java +++ b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java @@ -41,6 +41,7 @@ import com.google.api.gax.rpc.PagedCallSettings; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.ResumableUploadCallable; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.ApiTracerContext; @@ -220,6 +221,19 @@ ServerStreamingCallable createServerStreamingCallable( return callable.withDefaultCallContext(clientContext.getDefaultCallContext()); } + /** + * Create a resumable upload callable object. Designed for use by generated code. + * + * @param httpJsonCallSettings the http/json call settings + * @param clientContext {@link ClientContext} to use to connect to the service. + * @return {@link ResumableUploadCallable} callable object. + */ + public static ResumableUploadCallable createResumableUploadCallable( + HttpJsonCallSettings httpJsonCallSettings, + ClientContext clientContext) { + return new HttpJsonResumableUploadCallable<>(httpJsonCallSettings, clientContext); + } + static ApiTracerContext getApiTracerContext(@Nonnull ApiMethodDescriptor methodDescriptor) { return ApiTracerContext.newBuilder() .setFullMethodName(methodDescriptor.getFullMethodName()) diff --git a/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCall.java b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCall.java new file mode 100644 index 000000000000..3859ae27a448 --- /dev/null +++ b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCall.java @@ -0,0 +1,512 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.client.http.EmptyContent; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpContent; +import com.google.api.client.http.HttpMediaType; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.json.JsonHttpContent; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.client.util.GenericData; +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.InputStreamProvider; +import com.google.api.gax.rpc.ResumableUploadProgressListener; +import com.google.api.gax.rpc.ResumableUploadRequest; +import com.google.api.gax.rpc.ResumableUploadStatus; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** Encapsulates the execution logic and state machine of the Scotty Resumable Upload protocol. */ +final class HttpJsonResumableUploadCall { + + private static final Logger logger = Logger.getLogger(HttpJsonResumableUploadCall.class.getName()); + private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + + private final ApiMethodDescriptor methodDescriptor; + private final ResumableUploadRequest uploadRequest; + private final HttpTransport httpTransport; + private final HttpJsonMetadata requestHeaders; + private final HttpJsonCallOptions callOptions; + private final String endpoint; + private final Executor executor; + + private enum ErrorCategory { + CATEGORY_1_TRANSIENT, + CATEGORY_2_MISMATCH, + CATEGORY_3_FATAL + } + + HttpJsonResumableUploadCall( + ApiMethodDescriptor methodDescriptor, + ResumableUploadRequest uploadRequest, + HttpTransport httpTransport, + HttpJsonMetadata requestHeaders, + HttpJsonCallOptions callOptions, + String endpoint, + Executor executor) { + this.methodDescriptor = Preconditions.checkNotNull(methodDescriptor); + this.uploadRequest = Preconditions.checkNotNull(uploadRequest); + this.httpTransport = Preconditions.checkNotNull(httpTransport); + this.requestHeaders = Preconditions.checkNotNull(requestHeaders); + this.callOptions = Preconditions.checkNotNull(callOptions); + this.endpoint = Preconditions.checkNotNull(endpoint); + this.executor = Preconditions.checkNotNull(executor); + } + + ApiFuture execute() { + SettableApiFuture future = SettableApiFuture.create(); + executor.execute( + () -> { + try { + ResponseT result = runStateMachine(); + future.set(result); + } catch (Throwable t) { + future.setException(t); + } + }); + return future; + } + + private ResponseT runStateMachine() throws Exception { + try { + return runStateMachineInternal(); + } catch (HttpResponseException e) { + throw translateException(e); + } + } + + private ResponseT runStateMachineInternal() throws Exception { + Instant deadline = calculateDeadline(); + + // Phase 1: Start Session (with retry) + String uploadUrl = null; + int attempt = 0; + while (true) { + try { + checkDeadline(deadline); + uploadUrl = startSession(deadline); + break; // Success + } catch (Exception e) { + checkDeadline(deadline); + ErrorCategory category = getErrorCategory(e); + if (category == ErrorCategory.CATEGORY_1_TRANSIENT) { + attempt++; + long delayMs = calculateBackoff(attempt); + logger.log(Level.WARNING, "Transient error starting session. Backing off for " + delayMs + " ms", e); + sleep(delayMs); + } else { + throw e; // Fatal/Mismatch, bubble up + } + } + } + logger.log(Level.FINE, "Resumable session started. Upload URL: {0}", uploadUrl); + + long offset = 0; + attempt = 0; + long previousOffset = -1; + + // Phase 2 & 3 Loop: Transmit & Query Recovery + while (true) { + try { + checkDeadline(deadline); + return transmitRemaining(uploadUrl, offset, deadline); + } catch (Exception e) { + checkDeadline(deadline); + ErrorCategory category = getErrorCategory(e); + + if (category == ErrorCategory.CATEGORY_2_MISMATCH) { + logger.log(Level.WARNING, "State mismatch detected. Triggering recovery...", e); + updateProgress(offset, ResumableUploadProgressListener.State.RECOVERING); + + offset = recoverOffset(uploadUrl, deadline); + logger.log(Level.INFO, "Recovery completed. Server received bytes: {0}", offset); + + if (offset == previousOffset) { + // No progress was made since last recovery. Wait with backoff to prevent slamming server. + attempt++; + long delayMs = calculateBackoff(attempt); + sleep(delayMs); + } else { + attempt = 0; // Reset attempts on progress + previousOffset = offset; + } + } else if (category == ErrorCategory.CATEGORY_1_TRANSIENT) { + attempt++; + long delayMs = calculateBackoff(attempt); + logger.log(Level.WARNING, "Transient error. Backing off for {0} ms (attempt {1})", new Object[]{delayMs, attempt}); + sleep(delayMs); + } else { + updateProgress(offset, ResumableUploadProgressListener.State.FAILED); + throw e; // Fatal, bubble up + } + } + } + } + + private String startSession(Instant deadline) throws Exception { + HttpRequestFactory requestFactory = getRequestFactory(); + HttpRequestFormatter requestFormatter = methodDescriptor.getRequestFormatter(); + + GenericData tokenRequest = new GenericData(); + String requestBody = requestFormatter.getRequestBody(uploadRequest.getRequest()); + HttpContent initialContent; + + if (!Strings.isNullOrEmpty(requestBody)) { + JSON_FACTORY.createJsonParser(requestBody).parse(tokenRequest); + initialContent = new JsonHttpContent(JSON_FACTORY, tokenRequest) + .setMediaType(new HttpMediaType("application/json; charset=utf-8")); + } else { + initialContent = new EmptyContent(); + } + + // Scotty specific path modifier + String path = "/resumable/upload" + requestFormatter.getPath(uploadRequest.getRequest()); + GenericUrl url = new GenericUrl(normalizeEndpoint(endpoint) + path); + + // Populate query parameters + Map> queryParams = requestFormatter.getQueryParamNames(uploadRequest.getRequest()); + for (Map.Entry> queryParam : queryParams.entrySet()) { + if (queryParam.getValue() != null) { + url.set(queryParam.getKey(), queryParam.getValue()); + } + } + + HttpRequest httpRequest = requestFactory.buildPostRequest(url, initialContent); + configureTimeouts(httpRequest, deadline); + + // Set standard headers + merge custom metadata + for (Map.Entry entry : requestHeaders.getHeaders().entrySet()) { + String key = entry.getKey(); + String value = (String) entry.getValue(); + + // Prefix metadata headers to prevent collision with physical request metadata + if (isMetadataHeaderDenylisted(key)) { + httpRequest.getHeaders().set("X-Goog-Upload-Header-" + key, value); + } else { + httpRequest.getHeaders().set(key, value); + } + } + + httpRequest.getHeaders().set("X-Goog-Upload-Protocol", "resumable"); + httpRequest.getHeaders().set("X-Goog-Upload-Command", "start"); + + updateProgress(0, ResumableUploadProgressListener.State.NOT_STARTED); + + HttpResponse response = null; + try { + response = httpRequest.execute(); + String status = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Status"); + if (!"active".equalsIgnoreCase(status)) { + throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) + .setMessage("Failed to initiate resumable session: Status is not active") + .build(); + } + String uploadUrl = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-URL"); + if (Strings.isNullOrEmpty(uploadUrl)) { + throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) + .setMessage("Failed to initiate resumable session: Missing upload URL") + .build(); + } + return uploadUrl; + } finally { + if (response != null) { + response.disconnect(); + } + } + } + + private ResponseT transmitRemaining(String uploadUrl, long offset, Instant deadline) throws Exception { + HttpRequestFactory requestFactory = getRequestFactory(); + + InputStream stream = uploadRequest.getStreamProvider().get(); + if (offset > 0) { + long skipped = stream.skip(offset); + if (skipped < offset) { + throw new IOException("Failed to skip stream bytes to offset: " + offset); + } + } + + // Wrap the stream in custom HttpContent that updates the progress listener + HttpContent payload = new ProgressReportingHttpContent( + stream, uploadRequest.getTotalBytes(), offset, uploadRequest.getProgressListener()); + + GenericUrl url = new GenericUrl(uploadUrl); + HttpRequest httpRequest = requestFactory.buildPostRequest(url, payload); + configureTimeouts(httpRequest, deadline); + + httpRequest.getHeaders().set("X-Goog-Upload-Command", "upload, finalize"); + httpRequest.getHeaders().set("X-Goog-Upload-Offset", String.valueOf(offset)); + + updateProgress(offset, ResumableUploadProgressListener.State.IN_PROGRESS); + + HttpResponse response = null; + try { + response = httpRequest.execute(); + String status = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Status"); + + if (!"final".equalsIgnoreCase(status)) { + throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) + .setMessage("Resumable upload failed: Status is not final") + .build(); + } + + InputStreamReader reader = new InputStreamReader(response.getContent(), StandardCharsets.UTF_8); + ResponseT parsedResponse = methodDescriptor.getResponseParser().parse(reader, callOptions.getTypeRegistry()); + + updateProgress(uploadRequest.getTotalBytes() > 0 ? uploadRequest.getTotalBytes() : offset, + ResumableUploadProgressListener.State.COMPLETED); + + return parsedResponse; + } finally { + if (response != null) { + response.disconnect(); + } + } + } + + private long recoverOffset(String uploadUrl, Instant deadline) throws Exception { + HttpRequestFactory requestFactory = getRequestFactory(); + GenericUrl url = new GenericUrl(uploadUrl); + + HttpRequest httpRequest = requestFactory.buildPostRequest(url, new EmptyContent()); + configureTimeouts(httpRequest, deadline); + + httpRequest.getHeaders().set("X-Goog-Upload-Command", "query"); + + HttpResponse response = null; + try { + response = httpRequest.execute(); + String status = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Status"); + + if ("final".equalsIgnoreCase(status)) { + // Already finalized, query command behaves like final + throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) + .setMessage("Query returned final status. Re-executing state machine...") + .build(); + } + if (!"active".equalsIgnoreCase(status)) { + throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) + .setMessage("Query failed: Status is not active") + .build(); + } + + String receivedSizeStr = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Size-Received"); + if (Strings.isNullOrEmpty(receivedSizeStr)) { + throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) + .setMessage("Query failed: Missing size received header") + .build(); + } + + return Long.parseLong(receivedSizeStr); + } finally { + if (response != null) { + response.disconnect(); + } + } + } + + private void configureTimeouts(HttpRequest request, Instant deadline) { + long remainingMs = Duration.between(Instant.now(), deadline).toMillis(); + if (remainingMs <= 0) { + remainingMs = 1; // force timeout + } + request.setConnectTimeout((int) remainingMs); + request.setReadTimeout((int) remainingMs); + } + + private Instant calculateDeadline() { + Duration timeout = callOptions.getTimeoutDuration(); + if (timeout != null && !timeout.isZero() && !timeout.isNegative()) { + return Instant.now().plus(timeout); + } + return Instant.now().plus(Duration.ofMinutes(10)); // Default deadline of 10 mins + } + + private void checkDeadline(Instant deadline) throws DeadlineExceededException { + if (Instant.now().isAfter(deadline)) { + throw (DeadlineExceededException) com.google.api.gax.rpc.ApiExceptionFactory.createException( + "Resumable upload session exceeded the configured deadline.", + null, + HttpJsonStatusCode.of(com.google.api.gax.rpc.StatusCode.Code.DEADLINE_EXCEEDED), + false); + } + } + + private ErrorCategory getErrorCategory(Throwable t) { + if (t instanceof HttpResponseException) { + int statusCode = ((HttpResponseException) t).getStatusCode(); + if (statusCode == 429 || statusCode >= 500) { + return ErrorCategory.CATEGORY_1_TRANSIENT; + } + if (statusCode == 400 || statusCode == 412 || statusCode == 416) { + return ErrorCategory.CATEGORY_2_MISMATCH; + } + return ErrorCategory.CATEGORY_3_FATAL; + } + if (t instanceof IOException) { + return ErrorCategory.CATEGORY_1_TRANSIENT; + } + return ErrorCategory.CATEGORY_3_FATAL; + } + + private long calculateBackoff(int attempt) { + long baseDelay = 500; // 500ms + long maxDelay = 30000; // 30s + long delay = (long) (baseDelay * Math.pow(2, attempt)); + return Math.min(delay, maxDelay); + } + + private void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void updateProgress(long bytesUploaded, ResumableUploadProgressListener.State state) { + ResumableUploadProgressListener progressListener = uploadRequest.getProgressListener(); + if (progressListener != null) { + progressListener.onProgress(new ResumableUploadStatus(bytesUploaded, uploadRequest.getTotalBytes(), state)); + } + } + + private HttpRequestFactory getRequestFactory() { + Credentials credentials = callOptions.getCredentials(); + if (credentials != null) { + return httpTransport.createRequestFactory(new HttpCredentialsAdapter(credentials)); + } + return httpTransport.createRequestFactory(); + } + + private boolean isMetadataHeaderDenylisted(String key) { + // Standard body-related headers that must be prefixed when uploading session metadata + return "Content-Length".equalsIgnoreCase(key) + || "Content-Type".equalsIgnoreCase(key) + || "Content-Encoding".equalsIgnoreCase(key) + || "Transfer-Encoding".equalsIgnoreCase(key); + } + + private String normalizeEndpoint(String rawEndpoint) { + String normalized = rawEndpoint; + if (!normalized.contains("://")) { + normalized = "https://" + normalized; + } + if (normalized.charAt(normalized.length() - 1) != '/') { + normalized += '/'; + } + return normalized; + } + + private Exception translateException(HttpResponseException e) { + // Return standard GAX Exception or original depending on status code + HttpJsonApiExceptionFactory factory = new HttpJsonApiExceptionFactory(java.util.Collections.emptySet()); + return factory.create(e); + } + + /** Custom HttpContent class that streams data and reports progress callbacks. */ + private static final class ProgressReportingHttpContent implements HttpContent { + private final InputStream stream; + private final long totalLength; + private final long initialOffset; + private final ResumableUploadProgressListener progressListener; + + ProgressReportingHttpContent( + InputStream stream, + long totalLength, + long initialOffset, + ResumableUploadProgressListener progressListener) { + this.stream = stream; + this.totalLength = totalLength; + this.initialOffset = initialOffset; + this.progressListener = progressListener; + } + + @Override + public long getLength() throws IOException { + // Return -1 to force chunked transfer encoding as the length of the remaining stream + // might be different from physical Content-Length, or if totalLength is unknown. + return -1; + } + + @Override + public String getType() { + return "application/octet-stream"; + } + + @Override + public boolean retrySupported() { + // Handled by our state machine recreating the stream via streamProvider + return false; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + byte[] buffer = new byte[65536]; // 64KB buffer + int len; + long bytesUploaded = initialOffset; + while ((len = stream.read(buffer)) != -1) { + out.write(buffer, 0, len); + out.flush(); + bytesUploaded += len; + if (progressListener != null) { + progressListener.onProgress( + new ResumableUploadStatus( + bytesUploaded, totalLength, ResumableUploadProgressListener.State.IN_PROGRESS)); + } + } + } + } +} diff --git a/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallable.java b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallable.java new file mode 100644 index 000000000000..19491a193683 --- /dev/null +++ b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallable.java @@ -0,0 +1,103 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.client.http.HttpTransport; +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ResumableUploadCallable; +import com.google.api.gax.rpc.ResumableUploadRequest; +import com.google.common.base.Preconditions; +import java.util.concurrent.Executor; + +/** + * A {@link ResumableUploadCallable} that uses HTTP/JSON transport. + * + * @param request type + * @param response type + */ +@BetaApi +public final class HttpJsonResumableUploadCallable + extends ResumableUploadCallable { + + private final HttpJsonCallSettings httpJsonCallSettings; + private final ClientContext clientContext; + + public HttpJsonResumableUploadCallable( + HttpJsonCallSettings httpJsonCallSettings, + ClientContext clientContext) { + this.httpJsonCallSettings = Preconditions.checkNotNull(httpJsonCallSettings); + this.clientContext = Preconditions.checkNotNull(clientContext); + } + + @Override + public ApiFuture futureCall( + ResumableUploadRequest request, ApiCallContext context) { + Preconditions.checkNotNull(request); + + // Resolve call context + HttpJsonCallContext httpJsonContext = HttpJsonCallContext.createDefault(); + if (context != null) { + httpJsonContext = httpJsonContext.nullToSelf(context); + } + + // Resolve channel and endpoint + HttpJsonTransportChannel transportChannel = (HttpJsonTransportChannel) clientContext.getTransportChannel(); + ManagedHttpJsonChannel channel = transportChannel.getManagedChannel(); + String endpoint = channel.getEndpoint(); + HttpTransport httpTransport = channel.getHttpTransport(); + + // Resolve credentials and executor + HttpJsonCallOptions callOptions = httpJsonContext.getCallOptions(); + if (callOptions.getCredentials() == null && clientContext.getCredentials() != null) { + callOptions = callOptions.toBuilder().setCredentials(clientContext.getCredentials()).build(); + } + + Executor executor = clientContext.getExecutor(); + + // Gather request headers + HttpJsonMetadata requestHeaders = HttpJsonMetadata.newBuilder().build() + .withHeaders(httpJsonContext.getExtraHeaders()); + + HttpJsonResumableUploadCall call = new HttpJsonResumableUploadCall<>( + httpJsonCallSettings.getMethodDescriptor(), + request, + httpTransport, + requestHeaders, + callOptions, + endpoint, + executor + ); + + return call.execute(); + } +} diff --git a/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index bd3bed855608..69dc242d6e8b 100644 --- a/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -91,6 +91,10 @@ Executor getExecutor() { return executor; } + HttpTransport getHttpTransport() { + return httpTransport; + } + @Override public synchronized void shutdown() { // Calling shutdown/ shutdownNow() twice should no-op diff --git a/sdk-platform-java/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallableTest.java b/sdk-platform-java/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallableTest.java new file mode 100644 index 000000000000..2a8c08cbb02e --- /dev/null +++ b/sdk-platform-java/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonResumableUploadCallableTest.java @@ -0,0 +1,345 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.InputStreamProvider; +import com.google.api.gax.rpc.ResumableUploadProgressListener; +import com.google.api.gax.rpc.ResumableUploadRequest; +import com.google.api.gax.rpc.ResumableUploadStatus; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.TypeRegistry; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class HttpJsonResumableUploadCallableTest { + + @Mock private ApiMethodDescriptor methodDescriptor; + @Mock private HttpRequestFormatter requestFormatter; + @Mock private HttpResponseParser responseParser; + @Mock private ClientContext clientContext; + @Mock private HttpJsonTransportChannel transportChannel; + @Mock private ManagedHttpJsonChannel managedChannel; + + private java.util.concurrent.ScheduledExecutorService executor; + + @BeforeEach + void setUp() { + executor = Executors.newSingleThreadScheduledExecutor(); + Mockito.lenient().when(clientContext.getExecutor()).thenReturn(executor); + Mockito.lenient().when(clientContext.getTransportChannel()).thenReturn(transportChannel); + Mockito.lenient().when(transportChannel.getManagedChannel()).thenReturn(managedChannel); + Mockito.lenient().when(managedChannel.getEndpoint()).thenReturn("localhost"); + + // Wire formatter and parser mocking + Mockito.lenient().when(methodDescriptor.getRequestFormatter()).thenReturn(requestFormatter); + Mockito.lenient().when(methodDescriptor.getResponseParser()).thenReturn(responseParser); + Mockito.lenient().when(requestFormatter.getPath(Mockito.anyString())).thenReturn("/upload/resource"); + Mockito.lenient().when(requestFormatter.getRequestBody(Mockito.anyString())).thenReturn("{\"metadata\":\"value\"}"); + Mockito.lenient().when(requestFormatter.getQueryParamNames(Mockito.anyString())).thenReturn(Collections.emptyMap()); + } + + @AfterEach + void tearDown() { + executor.shutdown(); + } + + @Test + void happyPathUpload() throws Exception { + byte[] data = "Hello, World! Scotty upload".getBytes(); + InputStreamProvider streamProvider = () -> new ByteArrayInputStream(data); + + // Sequence of mock HTTP responses + Queue mockResponses = new ConcurrentLinkedQueue<>(); + + // 1. Session start response + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "active") + .addHeader("X-Goog-Upload-URL", "https://localhost/session/12345")); + + // 2. Finalize upload response + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "final") + .setContent("{\"response\":\"success\"}")); + + MockHttpTransport transport = new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest(url) { + @Override + public LowLevelHttpResponse execute() throws IOException { + MockLowLevelHttpResponse response = mockResponses.poll(); + if (response == null) { + throw new IOException("Unexpected out-of-bounds mock request: " + url); + } + return response; + } + }; + } + }; + + Mockito.when(managedChannel.getHttpTransport()).thenReturn(transport); + Mockito.when(responseParser.parse(Mockito.any(Reader.class), Mockito.nullable(TypeRegistry.class))) + .thenReturn("SUCCESS_RESPONSE"); + + List progressHistory = new ArrayList<>(); + ResumableUploadProgressListener listener = progressHistory::add; + + ResumableUploadRequest request = ResumableUploadRequest.newBuilder() + .setRequest("META") + .setStreamProvider(streamProvider) + .setTotalBytes(data.length) + .setProgressListener(listener) + .build(); + + HttpJsonCallSettings settings = HttpJsonCallSettings.newBuilder() + .setMethodDescriptor(methodDescriptor) + .build(); + + HttpJsonResumableUploadCallable callable = new HttpJsonResumableUploadCallable<>(settings, clientContext); + + String response = callable.call(request); + + assertThat(response).isEqualTo("SUCCESS_RESPONSE"); + assertThat(progressHistory).isNotEmpty(); + + // Verify progress tracking states + assertThat(progressHistory.get(0).getState()).isEqualTo(ResumableUploadProgressListener.State.NOT_STARTED); + + ResumableUploadStatus lastStatus = progressHistory.get(progressHistory.size() - 1); + assertThat(lastStatus.getState()).isEqualTo(ResumableUploadProgressListener.State.COMPLETED); + assertThat(lastStatus.getBytesUploaded()).isEqualTo(data.length); + } + + @Test + void retryOnTransientStartError() throws Exception { + byte[] data = "Short stream".getBytes(); + InputStreamProvider streamProvider = () -> new ByteArrayInputStream(data); + + Queue mockResponses = new ConcurrentLinkedQueue<>(); + + // 1. Session start transient failure (503) + mockResponses.add(new MockLowLevelHttpResponse().setStatusCode(503).setReasonPhrase("Service Unavailable").setContent("")); + + // 2. Retry start session success + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "active") + .addHeader("X-Goog-Upload-URL", "https://localhost/session/12345")); + + // 3. Finalize upload success + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "final") + .setContent("{\"response\":\"ok\"}")); + + MockHttpTransport transport = new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest(url) { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponses.poll(); + } + }; + } + }; + + Mockito.when(managedChannel.getHttpTransport()).thenReturn(transport); + Mockito.when(responseParser.parse(Mockito.any(Reader.class), Mockito.nullable(TypeRegistry.class))) + .thenReturn("OK_REP"); + + ResumableUploadRequest request = ResumableUploadRequest.newBuilder() + .setRequest("META") + .setStreamProvider(streamProvider) + .build(); + + HttpJsonCallSettings settings = HttpJsonCallSettings.newBuilder() + .setMethodDescriptor(methodDescriptor) + .build(); + + HttpJsonResumableUploadCallable callable = new HttpJsonResumableUploadCallable<>(settings, clientContext); + + String response = callable.call(request); + assertThat(response).isEqualTo("OK_REP"); + } + + @Test + void stateMismatchRecoveryAndResume() throws Exception { + byte[] data = "First segment of data... Second segment of data".getBytes(); + InputStreamProvider streamProvider = () -> new ByteArrayInputStream(data); + + Queue mockResponses = new ConcurrentLinkedQueue<>(); + + // 1. Session start success + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "active") + .addHeader("X-Goog-Upload-URL", "https://localhost/session/12345")); + + // 2. Transmit failure with Category 2 (400 Bad Request) + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(400) + .setReasonPhrase("Bad Request - offset mismatch") + .setContent("")); + + // 3. Query offset command (server has received 24 bytes) + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "active") + .addHeader("X-Goog-Upload-Size-Received", "24")); + + // 4. Upload rest starting from offset 24 success + mockResponses.add( + new MockLowLevelHttpResponse() + .setStatusCode(200) + .addHeader("X-Goog-Upload-Status", "final") + .setContent("{\"response\":\"restored\"}")); + + MockHttpTransport transport = new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest(url) { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponses.poll(); + } + }; + } + }; + + Mockito.when(managedChannel.getHttpTransport()).thenReturn(transport); + Mockito.when(responseParser.parse(Mockito.any(Reader.class), Mockito.nullable(TypeRegistry.class))) + .thenReturn("RECOVERY_REP"); + + List progressHistory = new ArrayList<>(); + ResumableUploadProgressListener listener = progressHistory::add; + + ResumableUploadRequest request = ResumableUploadRequest.newBuilder() + .setRequest("META") + .setStreamProvider(streamProvider) + .setTotalBytes(data.length) + .setProgressListener(listener) + .build(); + + HttpJsonCallSettings settings = HttpJsonCallSettings.newBuilder() + .setMethodDescriptor(methodDescriptor) + .build(); + + HttpJsonResumableUploadCallable callable = new HttpJsonResumableUploadCallable<>(settings, clientContext); + + String response = callable.call(request); + + assertThat(response).isEqualTo("RECOVERY_REP"); + + // Verify recovery state was logged + boolean hasRecoveringState = false; + for (ResumableUploadStatus status : progressHistory) { + if (status.getState() == ResumableUploadProgressListener.State.RECOVERING) { + hasRecoveringState = true; + break; + } + } + assertThat(hasRecoveringState).isTrue(); + } + + @Test + void fatalErrorFailsImmediately() { + byte[] data = "test data".getBytes(); + InputStreamProvider streamProvider = () -> new ByteArrayInputStream(data); + + Queue mockResponses = new ConcurrentLinkedQueue<>(); + + // 1. Session start returns 403 Forbidden (Category 3 Fatal) + mockResponses.add(new MockLowLevelHttpResponse().setStatusCode(403).setReasonPhrase("Forbidden").setContent("")); + + MockHttpTransport transport = new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest(url) { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponses.poll(); + } + }; + } + }; + + Mockito.when(managedChannel.getHttpTransport()).thenReturn(transport); + + ResumableUploadRequest request = ResumableUploadRequest.newBuilder() + .setRequest("META") + .setStreamProvider(streamProvider) + .build(); + + HttpJsonCallSettings settings = HttpJsonCallSettings.newBuilder() + .setMethodDescriptor(methodDescriptor) + .build(); + + HttpJsonResumableUploadCallable callable = new HttpJsonResumableUploadCallable<>(settings, clientContext); + + assertThrows(ApiException.class, () -> callable.call(request)); + } +} diff --git a/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/InputStreamProvider.java b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/InputStreamProvider.java new file mode 100644 index 000000000000..9d65114c5dac --- /dev/null +++ b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/InputStreamProvider.java @@ -0,0 +1,50 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.rpc; + +import com.google.api.core.BetaApi; +import java.io.IOException; +import java.io.InputStream; + +/** + * Provides a fresh {@link InputStream} for retriable upload operations. + * This is used to seek or rewind a stream when recovering from errors. + */ +@BetaApi +@FunctionalInterface +public interface InputStreamProvider { + /** + * Returns a new {@link InputStream}. + * + * @return a new input stream + * @throws IOException if the stream cannot be created + */ + InputStream get() throws IOException; +} diff --git a/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadCallable.java b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadCallable.java new file mode 100644 index 000000000000..b7754cea0c81 --- /dev/null +++ b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadCallable.java @@ -0,0 +1,87 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.rpc; + +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; + +/** + * A ResumableUploadCallable is an API-transport-independent wrapper for the Scotty Resumable Upload + * protocol. + * + * @param request type + * @param response type + */ +@BetaApi +public abstract class ResumableUploadCallable { + + protected ResumableUploadCallable() {} + + /** + * Performs the resumable upload asynchronously. + * + * @param request the upload request options + * @param context the context of the call + * @return future for the response + */ + public abstract ApiFuture futureCall( + ResumableUploadRequest request, ApiCallContext context); + + /** + * Performs the resumable upload asynchronously. + * + * @param request the upload request options + * @return future for the response + */ + public ApiFuture futureCall(ResumableUploadRequest request) { + return futureCall(request, null); + } + + /** + * Performs the resumable upload synchronously. + * + * @param request the upload request options + * @param context the context of the call + * @return the RPC response + */ + public ResponseT call(ResumableUploadRequest request, ApiCallContext context) { + return ApiExceptions.callAndTranslateApiException(futureCall(request, context)); + } + + /** + * Performs the resumable upload synchronously. + * + * @param request the upload request options + * @return the RPC response + */ + public ResponseT call(ResumableUploadRequest request) { + return call(request, null); + } +} diff --git a/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadProgressListener.java b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadProgressListener.java new file mode 100644 index 000000000000..f843f209f396 --- /dev/null +++ b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadProgressListener.java @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.rpc; + +import com.google.api.core.BetaApi; + +/** Listener for tracking the progress of a resumable upload session. */ +@BetaApi +@FunctionalInterface +public interface ResumableUploadProgressListener { + + /** The state of the upload session. */ + enum State { + NOT_STARTED, + IN_PROGRESS, + RECOVERING, + COMPLETED, + FAILED, + CANCELLED + } + + /** + * Invoked when upload progress or state changes. + * + * @param status the current status of the upload + */ + void onProgress(ResumableUploadStatus status); +} diff --git a/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadRequest.java b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadRequest.java new file mode 100644 index 000000000000..a4a02781f112 --- /dev/null +++ b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadRequest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.rpc; + +import com.google.api.core.BetaApi; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Parameter class for a resumable upload call. Contains the request metadata, the stream payload, + * and the progress listener. + * + * @param the type of request message that contains standard metadata + */ +@BetaApi +public final class ResumableUploadRequest { + private final RequestT request; + private final InputStreamProvider streamProvider; + private final long totalBytes; + private final ResumableUploadProgressListener progressListener; + + private ResumableUploadRequest(Builder builder) { + this.request = Preconditions.checkNotNull(builder.request); + this.streamProvider = Preconditions.checkNotNull(builder.streamProvider); + this.totalBytes = builder.totalBytes; + this.progressListener = builder.progressListener; + } + + /** Returns the metadata request message. */ + @Nonnull + public RequestT getRequest() { + return request; + } + + /** Returns the stream provider. */ + @Nonnull + public InputStreamProvider getStreamProvider() { + return streamProvider; + } + + /** Returns the total size of the stream, or -1 if unknown. */ + public long getTotalBytes() { + return totalBytes; + } + + /** Returns the progress listener, or null if not set. */ + @Nullable + public ResumableUploadProgressListener getProgressListener() { + return progressListener; + } + + public static Builder newBuilder() { + return new Builder<>(); + } + + public static class Builder { + private RequestT request; + private InputStreamProvider streamProvider; + private long totalBytes = -1; + private ResumableUploadProgressListener progressListener; + + public Builder setRequest(RequestT request) { + this.request = request; + return this; + } + + public Builder setStreamProvider(InputStreamProvider streamProvider) { + this.streamProvider = streamProvider; + return this; + } + + public Builder setTotalBytes(long totalBytes) { + this.totalBytes = totalBytes; + return this; + } + + public Builder setProgressListener(ResumableUploadProgressListener progressListener) { + this.progressListener = progressListener; + return this; + } + + public ResumableUploadRequest build() { + return new ResumableUploadRequest<>(this); + } + } +} diff --git a/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadStatus.java b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadStatus.java new file mode 100644 index 000000000000..e5c73670d422 --- /dev/null +++ b/sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/rpc/ResumableUploadStatus.java @@ -0,0 +1,74 @@ +/* + * Copyright 2026 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.rpc; + +import com.google.api.core.BetaApi; + +/** Status snapshot of an ongoing resumable upload. */ +@BetaApi +public final class ResumableUploadStatus { + private final long bytesUploaded; + private final long totalBytes; + private final ResumableUploadProgressListener.State state; + + public ResumableUploadStatus( + long bytesUploaded, long totalBytes, ResumableUploadProgressListener.State state) { + this.bytesUploaded = bytesUploaded; + this.totalBytes = totalBytes; + this.state = state; + } + + /** Returns the number of bytes successfully uploaded to the server so far. */ + public long getBytesUploaded() { + return bytesUploaded; + } + + /** Returns the total size of the stream in bytes, or -1 if unknown. */ + public long getTotalBytes() { + return totalBytes; + } + + /** Returns the current state of the upload session. */ + public ResumableUploadProgressListener.State getState() { + return state; + } + + @Override + public String toString() { + return "ResumableUploadStatus{" + + "bytesUploaded=" + + bytesUploaded + + ", totalBytes=" + + totalBytes + + ", state=" + + state + + '}'; + } +}