Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type OrchestrationSession,
ThreadId,
type ProviderSession,
type ProviderSendTurnInput,
type RuntimeMode,
type TurnId,
} from "@t3tools/contracts";
Expand All @@ -29,6 +30,7 @@ import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts";
import { increment, orchestrationEventsProcessedTotal } from "../../observability/Metrics.ts";
import { ProviderAdapterRequestError } from "../../provider/Errors.ts";
import type { ProviderServiceError } from "../../provider/Errors.ts";
import { classifyProviderServiceFailure } from "../../provider/providerFallback.ts";
import { TextGeneration } from "../../textGeneration/TextGeneration.ts";
import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { ProviderRegistry } from "../../provider/Services/ProviderRegistry.ts";
Expand All @@ -41,6 +43,8 @@ import {
import { ServerSettingsService } from "../../serverSettings.ts";
import { VcsStatusBroadcaster } from "../../vcs/VcsStatusBroadcaster.ts";
import { GitWorkflowService } from "../../git/GitWorkflowService.ts";
import { attemptProviderFallback } from "../providerFallbackWorkflow.ts";
import { completeProviderFallbackChain } from "../providerFallbackChain.ts";
const isProviderAdapterRequestError = Schema.is(ProviderAdapterRequestError);
const isProviderDriverKind = Schema.is(ProviderDriverKind);

Expand Down Expand Up @@ -770,6 +774,10 @@ const make = Effect.gen(function* () {
return;
}

// A visible user turn starts a new fallback chain. Mid-task fallback turns
// bypass this reactor, so their attempted-instance history remains intact.
completeProviderFallbackChain(thread.id);

const isFirstUserMessageTurn =
thread.messages.filter((entry) => entry.role === "user").length === 1;
if (isFirstUserMessageTurn) {
Expand Down Expand Up @@ -825,8 +833,54 @@ const make = Effect.gen(function* () {
);
};

const recoverTurnStartFailure = (cause: Cause.Cause<unknown>) =>
handleTurnStartFailure(cause).pipe(
const attemptFallbackBeforeReporting = Effect.fnUntraced(function* (
cause: Cause.Cause<unknown>,
attemptedSendTurnInput?: ProviderSendTurnInput,
) {
const failure = classifyProviderServiceFailure(cause);
if (!failure) return false;
const modelSelection =
attemptedSendTurnInput?.modelSelection ??
event.payload.modelSelection ??
thread.modelSelection;
const sendTurnInput: ProviderSendTurnInput = attemptedSendTurnInput ?? {
threadId: event.payload.threadId,
...(toNonEmptyProviderInput(message.text)
? { input: toNonEmptyProviderInput(message.text) }
: {}),
...(message.attachments && message.attachments.length > 0
? { attachments: message.attachments }
: {}),
modelSelection,
interactionMode: event.payload.interactionMode,
};
const fallback = yield* attemptProviderFallback({
threadId: event.payload.threadId,
failedInstanceId: modelSelection.instanceId,
modelSelection,
runtimeMode: event.payload.runtimeMode,
sendTurnInput,
failure,
requireCompatibleContinuation: !isFirstUserMessageTurn,
createdAt: event.payload.createdAt,
});
Comment thread
cursor[bot] marked this conversation as resolved.
return fallback.switched;
});

const recoverTurnStartFailure = (
cause: Cause.Cause<unknown>,
attemptedSendTurnInput?: ProviderSendTurnInput,
) =>
attemptFallbackBeforeReporting(cause, attemptedSendTurnInput).pipe(
Effect.catchCause((fallbackCause) =>
Effect.logWarning("provider command reactor fallback attempt failed", {
eventType: event.type,
threadId: event.payload.threadId,
cause: Cause.pretty(fallbackCause),
originalCause: Cause.pretty(cause),
}).pipe(Effect.as(false)),
),
Effect.flatMap((switched) => (switched ? Effect.void : handleTurnStartFailure(cause))),
Effect.catchCause((recoveryCause) =>
Effect.logWarning("provider command reactor failed to recover turn start failure", {
eventType: event.type,
Expand All @@ -848,16 +902,17 @@ const make = Effect.gen(function* () {
createdAt: event.payload.createdAt,
}).pipe(
Effect.map(Option.some),
Effect.catchCause((cause) => handleTurnStartFailure(cause).pipe(Effect.as(Option.none()))),
Effect.catchCause((cause) => recoverTurnStartFailure(cause).pipe(Effect.as(Option.none()))),
);

if (Option.isNone(sendTurnRequest)) {
return;
}

yield* providerService
.sendTurn(sendTurnRequest.value)
.pipe(Effect.catchCause(recoverTurnStartFailure), Effect.forkScoped);
yield* providerService.sendTurn(sendTurnRequest.value).pipe(
Effect.catchCause((cause) => recoverTurnStartFailure(cause, sendTurnRequest.value)),
Effect.forkScoped,
);
});

const processTurnInterruptRequested = Effect.fn("processTurnInterruptRequested")(function* (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
ProviderService,
type ProviderServiceShape,
} from "../../provider/Services/ProviderService.ts";
import { makeProviderRegistryLayer } from "../../provider/testUtils/providerRegistryMock.ts";
import * as RepositoryIdentityResolver from "../../project/RepositoryIdentityResolver.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
Expand Down Expand Up @@ -238,6 +239,7 @@ describe("ProviderRuntimeIngestion", () => {
Layer.provideMerge(projectionSnapshotLayer),
Layer.provideMerge(SqlitePersistenceMemory),
Layer.provideMerge(Layer.succeed(ProviderService, provider.service)),
Layer.provideMerge(makeProviderRegistryLayer([])),
Layer.provideMerge(makeTestServerSettingsLayer(options?.serverSettings)),
Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())),
Layer.provideMerge(NodeServices.layer),
Expand Down
91 changes: 91 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import * as Stream from "effect/Stream";
import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker";

import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { classifyProviderRuntimeFailure } from "../../provider/providerFallback.ts";
import { ProjectionTurnRepository } from "../../persistence/Services/ProjectionTurns.ts";
import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts";
import { isGitRepository } from "../../git/Utils.ts";
Expand All @@ -38,6 +39,9 @@ import {
type ProviderRuntimeIngestionShape,
} from "../Services/ProviderRuntimeIngestion.ts";
import { ServerSettingsService } from "../../serverSettings.ts";
import { attemptProviderFallback } from "../providerFallbackWorkflow.ts";
import { decideProviderFallbackTrialEvent } from "../providerFallbackTrialGate.ts";
import { completeProviderFallbackChain } from "../providerFallbackChain.ts";

const providerTurnKey = (threadId: ThreadId, turnId: TurnId) => `${threadId}:${turnId}`;

Expand All @@ -54,6 +58,8 @@ const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120);
const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000;
const BUFFERED_PROPOSED_PLAN_BY_ID_TTL = Duration.minutes(120);
const MAX_BUFFERED_ASSISTANT_CHARS = 24_000;
const HANDLED_FALLBACK_EVENT_CACHE_CAPACITY = 10_000;
const HANDLED_FALLBACK_EVENT_TTL = Duration.minutes(120);
const STRICT_PROVIDER_LIFECYCLE_GUARD = process.env.T3CODE_STRICT_PROVIDER_LIFECYCLE_GUARD !== "0";

type TurnStartRequestedDomainEvent = Extract<
Expand Down Expand Up @@ -666,6 +672,12 @@ const make = Effect.gen(function* () {
lookup: () => Effect.succeed({ text: "", createdAt: "" }),
});

const handledFallbackEvents = yield* Cache.make<string, true>({
capacity: HANDLED_FALLBACK_EVENT_CACHE_CAPACITY,
timeToLive: HANDLED_FALLBACK_EVENT_TTL,
lookup: () => Effect.succeed(true),
});

const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) {
return yield* projectionSnapshotQuery
.getThreadDetailById(threadId)
Expand Down Expand Up @@ -1208,6 +1220,32 @@ const make = Effect.gen(function* () {
const thread = yield* resolveThreadShell(event.threadId);
if (!thread) return;

// A fallback candidate can emit before sendTurn confirms the handoff.
// Hold those events until the trial commits, or discard them if it rolls
// back, so provisional output never leaks into the thread projection.
const fallbackTrialDecision =
event.providerInstanceId === undefined
? "not-trial"
: yield* decideProviderFallbackTrialEvent(
thread.id,
event.providerInstanceId,
event.createdAt,
);
if (fallbackTrialDecision === "reject") {
return;
}

// Ignore events from an instance that no longer owns the thread. A
// committed trial is accepted during the narrow projection handoff.
if (
event.providerInstanceId !== undefined &&
thread.session?.providerInstanceId !== undefined &&
event.providerInstanceId !== thread.session.providerInstanceId &&
fallbackTrialDecision !== "accept"
) {
return;
Comment thread
cursor[bot] marked this conversation as resolved.
}

let loadedThreadDetail: OrchestrationThread | null | undefined;
const getLoadedThreadDetail = () =>
Effect.gen(function* () {
Expand All @@ -1222,6 +1260,59 @@ const make = Effect.gen(function* () {
const eventTurnId = toTurnId(event.turnId);
const activeTurnId = thread.session?.activeTurnId ?? null;

const fallbackFailure = classifyProviderRuntimeFailure(event);
const fallbackInstanceId = event.providerInstanceId ?? thread.session?.providerInstanceId;
if (
fallbackFailure &&
fallbackInstanceId !== undefined &&
(activeTurnId !== null || eventTurnId !== undefined)
) {
const fallbackKey = `${thread.id}:${fallbackInstanceId}:${eventTurnId ?? activeTurnId ?? event.eventId}`;
const handled = yield* Cache.getOption(handledFallbackEvents, fallbackKey);
if (Option.isNone(handled)) {
yield* Cache.set(handledFallbackEvents, fallbackKey, true);
const fallback = yield* attemptProviderFallback({
threadId: thread.id,
failedInstanceId: fallbackInstanceId,
modelSelection: thread.modelSelection,
runtimeMode: thread.runtimeMode,
sendTurnInput: {
threadId: thread.id,
input: "Continue.",
modelSelection: thread.modelSelection,
interactionMode: thread.interactionMode,
},
failure: fallbackFailure,
requireCompatibleContinuation: true,
createdAt: now,
}).pipe(
Effect.catchCause((cause) =>
Effect.logWarning("provider runtime fallback attempt failed", {
eventId: event.eventId,
eventType: event.type,
threadId: thread.id,
cause: Cause.pretty(cause),
}).pipe(
Effect.as({
switched: false,
restoredOriginalInstance: false,
skipped: [],
}),
),
),
);
if (fallback.switched || fallback.restoredOriginalInstance) return;
}
}

if (
!fallbackFailure &&
fallbackInstanceId !== undefined &&
(event.type === "turn.completed" || event.type === "session.exited")
) {
completeProviderFallbackChain(thread.id, fallbackInstanceId);
}

const conflictsWithActiveTurn =
activeTurnId !== null && eventTurnId !== undefined && !sameId(activeTurnId, eventTurnId);
const missingTurnForActiveTurn = activeTurnId !== null && eventTurnId === undefined;
Expand Down
72 changes: 72 additions & 0 deletions apps/server/src/orchestration/providerFallbackChain.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { ProviderInstanceId, ThreadId } from "@t3tools/contracts";
import { afterEach, describe, expect, it } from "vite-plus/test";

import {
beginProviderFallbackChain,
completeProviderFallbackChain,
markProviderFallbackInstanceAttempted,
resetProviderFallbackChainsForTest,
} from "./providerFallbackChain.ts";

const threadId = ThreadId.make("thread-1");
const first = ProviderInstanceId.make("codex-first");
const second = ProviderInstanceId.make("codex-second");
const third = ProviderInstanceId.make("codex-third");
const origin = {
instanceId: first,
displayName: "Codex First",
failure: { kind: "rate-limit" as const, message: "Usage limit reached." },
modelSelection: { instanceId: first, model: "gpt-5" },
session: undefined,
};

afterEach(resetProviderFallbackChainsForTest);

describe("provider fallback chain", () => {
it("retains every attempted instance across consecutive runtime failures", () => {
expect([...beginProviderFallbackChain(threadId, first, origin).attemptedInstanceIds]).toEqual([
first,
]);
markProviderFallbackInstanceAttempted(threadId, second);

const secondAttempt = beginProviderFallbackChain(threadId, second, {
...origin,
instanceId: second,
});
expect([...secondAttempt.attemptedInstanceIds]).toEqual([first, second]);
expect(secondAttempt.origin).toEqual(origin);
markProviderFallbackInstanceAttempted(threadId, third);

expect([...beginProviderFallbackChain(threadId, third, origin).attemptedInstanceIds]).toEqual([
first,
second,
third,
]);
});

it("starts a fresh chain after the active instance completes", () => {
beginProviderFallbackChain(threadId, first, origin);
markProviderFallbackInstanceAttempted(threadId, second);
completeProviderFallbackChain(threadId, second);

expect([
...beginProviderFallbackChain(threadId, second, {
...origin,
instanceId: second,
}).attemptedInstanceIds,
]).toEqual([second]);
});

it("does not let a stale instance complete the current chain", () => {
beginProviderFallbackChain(threadId, first, origin);
markProviderFallbackInstanceAttempted(threadId, second);
completeProviderFallbackChain(threadId, first);

expect([
...beginProviderFallbackChain(threadId, second, {
...origin,
instanceId: second,
}).attemptedInstanceIds,
]).toEqual([first, second]);
});
});
Loading
Loading