From 66d9fcf04e852f2278a4dffda866e24eb4e254ba Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 21 Jun 2026 16:13:55 +0000 Subject: [PATCH] Use Option and Duration in Effect server paths Co-authored-by: Julius Marminge --- .../OrchestrationEngineHarness.integration.ts | 116 ++++---- .../src/cloud/ManagedEndpointRuntime.test.ts | 239 ++++++++++------- .../src/cloud/ManagedEndpointRuntime.ts | 249 ++++++++++-------- apps/server/src/cloud/http.ts | 4 +- apps/server/src/server.test.ts | 9 +- 5 files changed, 337 insertions(+), 280 deletions(-) diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index ebc4f984b86..a5695bbdab4 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -9,6 +9,7 @@ import { type OrchestrationEvent, type OrchestrationThread, } from "@t3tools/contracts"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Exit from "effect/Exit"; import * as FileSystem from "effect/FileSystem"; @@ -120,39 +121,45 @@ class WaitForTimeoutError extends Schema.TaggedErrorClass() }, ) {} +class WaitForRetrySignal extends Schema.TaggedErrorClass()( + "WaitForRetrySignal", + { + description: Schema.String, + }, +) {} + +const isWaitForRetrySignal = Schema.is(WaitForRetrySignal); +const WAIT_FOR_RETRY_INTERVAL = Duration.millis(10); +const WAIT_FOR_DEFAULT_TIMEOUT = Duration.seconds(40); + function waitFor( read: Effect.Effect, predicate: (value: A) => boolean, description: string, - timeoutMs?: number, + timeout?: Duration.Input, ): Effect.Effect; function waitFor( read: Effect.Effect, predicate: (value: A) => value is B, description: string, - timeoutMs?: number, + timeout?: Duration.Input, ): Effect.Effect; function waitFor( read: Effect.Effect, predicate: (value: A) => boolean, description: string, - timeoutMs = 40_000, + timeout: Duration.Input = WAIT_FOR_DEFAULT_TIMEOUT, ): Effect.Effect { - const RETRY_SIGNAL = "wait_for_retry"; - const retryIntervalMs = 10; - const maxRetries = Math.max(0, Math.floor(timeoutMs / retryIntervalMs)); - const retrySchedule = Schedule.spaced(`${retryIntervalMs} millis`); - return read.pipe( - Effect.filterOrFail(predicate, () => RETRY_SIGNAL), + Effect.filterOrFail(predicate, () => new WaitForRetrySignal({ description })), Effect.retry({ - schedule: retrySchedule, - times: maxRetries, - while: (error) => error === RETRY_SIGNAL, + schedule: Schedule.spaced(WAIT_FOR_RETRY_INTERVAL), + while: isWaitForRetrySignal, + }), + Effect.timeoutOrElse({ + duration: timeout, + orElse: () => Effect.fail(new WaitForTimeoutError({ description })), }), - Effect.mapError((error) => - error === RETRY_SIGNAL ? new WaitForTimeoutError({ description }) : error, - ), Effect.orDie, ); } @@ -185,11 +192,11 @@ export interface OrchestrationIntegrationHarness { readonly waitForThread: ( threadId: string, predicate: (thread: OrchestrationThread) => boolean, - timeoutMs?: number, + timeout?: Duration.Input, ) => Effect.Effect; readonly waitForDomainEvent: ( predicate: (event: OrchestrationEvent) => boolean, - timeoutMs?: number, + timeout?: Duration.Input, ) => Effect.Effect, never>; readonly waitForPendingApproval: ( requestId: string, @@ -198,7 +205,7 @@ export interface OrchestrationIntegrationHarness { readonly decision: "accept" | "acceptForSession" | "decline" | "cancel" | null; readonly resolvedAt: string | null; }) => boolean, - timeoutMs?: number, + timeout?: Duration.Input, ) => Effect.Effect< { readonly status: "pending" | "resolved"; @@ -210,11 +217,11 @@ export interface OrchestrationIntegrationHarness { readonly waitForReceipt: { ( predicate: (receipt: OrchestrationRuntimeReceipt) => boolean, - timeoutMs?: number, + timeout?: Duration.Input, ): Effect.Effect; ( predicate: (receipt: OrchestrationRuntimeReceipt) => receipt is Receipt, - timeoutMs?: number, + timeout?: Duration.Input, ): Effect.Effect; }; readonly dispose: Effect.Effect; @@ -420,29 +427,29 @@ export const makeOrchestrationIntegrationHarness = ( yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) => Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid), ).pipe(Effect.forkIn(scope)); - yield* Effect.sleep(10); + yield* Effect.sleep(Duration.millis(10)); const waitForThread: OrchestrationIntegrationHarness["waitForThread"] = ( threadId, predicate, - timeoutMs, + timeout, ) => waitFor( snapshotQuery .getSnapshot() .pipe( - Effect.map( - (snapshot) => snapshot.threads.find((thread) => thread.id === threadId) ?? null, + Effect.map((snapshot) => + Option.fromUndefinedOr(snapshot.threads.find((thread) => thread.id === threadId)), ), ), - (thread): thread is OrchestrationThread => thread !== null && predicate(thread), + (thread) => Option.isSome(thread) && predicate(thread.value), `projected thread '${threadId}'`, - timeoutMs, - ) as Effect.Effect; + timeout, + ).pipe(Effect.map(Option.getOrThrow)); const waitForDomainEvent: OrchestrationIntegrationHarness["waitForDomainEvent"] = ( predicate, - timeoutMs, + timeout, ) => waitFor( Stream.runCollect(engine.readEvents(0)).pipe( @@ -450,68 +457,49 @@ export const makeOrchestrationIntegrationHarness = ( ), (events) => events.some(predicate), "domain event", - timeoutMs, + timeout, ); const waitForPendingApproval: OrchestrationIntegrationHarness["waitForPendingApproval"] = ( requestId, predicate, - timeoutMs, + timeout, ) => waitFor( pendingApprovalRepository .getByRequestId({ requestId: ApprovalRequestId.make(requestId) }) .pipe( - Effect.map((row) => - Option.match(row, { - onNone: () => null, - onSome: (value) => ({ - status: value.status, - decision: value.decision, - resolvedAt: value.resolvedAt, - }), - }), + Effect.map( + Option.map((value) => ({ + status: value.status, + decision: value.decision, + resolvedAt: value.resolvedAt, + })), ), ), - ( - row, - ): row is { - readonly status: "pending" | "resolved"; - readonly decision: "accept" | "acceptForSession" | "decline" | "cancel" | null; - readonly resolvedAt: string | null; - } => row !== null && predicate(row), + (row) => Option.isSome(row) && predicate(row.value), `pending approval '${requestId}'`, - timeoutMs, - ) as Effect.Effect< - { - readonly status: "pending" | "resolved"; - readonly decision: "accept" | "acceptForSession" | "decline" | "cancel" | null; - readonly resolvedAt: string | null; - }, - never - >; + timeout, + ).pipe(Effect.map(Option.getOrThrow)); function waitForReceipt( predicate: (receipt: OrchestrationRuntimeReceipt) => boolean, - timeoutMs?: number, + timeout?: Duration.Input, ): Effect.Effect; function waitForReceipt( predicate: (receipt: OrchestrationRuntimeReceipt) => receipt is Receipt, - timeoutMs?: number, + timeout?: Duration.Input, ): Effect.Effect; function waitForReceipt( predicate: (receipt: OrchestrationRuntimeReceipt) => boolean, - timeoutMs?: number, + timeout?: Duration.Input, ) { const readMatchingReceipt = Ref.get(receiptHistory).pipe( - Effect.map((history) => history.find(predicate)), + Effect.map((history) => Option.fromUndefinedOr(history.find(predicate))), ); - return waitFor( - readMatchingReceipt, - (receipt): receipt is OrchestrationRuntimeReceipt => receipt !== undefined, - "runtime receipt", - timeoutMs, + return waitFor(readMatchingReceipt, Option.isSome, "runtime receipt", timeout).pipe( + Effect.map(Option.getOrThrow), ); } diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts index e0d5924fcc2..675634813e7 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, it } from "@effect/vitest"; import { vi } from "vite-plus/test"; import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; @@ -81,21 +81,24 @@ function makeHandle(input: { describe("CloudManagedEndpointRuntime", () => { it("classifies Cloudflare connection and warning output", () => { - expect( + assert.equal( ManagedEndpointRuntime.classifyRelayClientOutput( "2026-06-17T02:00:00Z INF Registered tunnel connection connIndex=0", ), - ).toBe("connected"); - expect( + "connected", + ); + assert.equal( ManagedEndpointRuntime.classifyRelayClientOutput( "2026-06-17T02:00:00Z ERR Failed to serve tunnel connection", ), - ).toBe("warning"); - expect( + "warning", + ); + assert.equal( ManagedEndpointRuntime.classifyRelayClientOutput( "2026-06-17T02:00:00Z INF Starting metrics server", ), - ).toBe("debug"); + "debug", + ); }); it.effect("starts, deduplicates, rotates, and stops the Cloudflare connector", () => @@ -123,41 +126,65 @@ describe("CloudManagedEndpointRuntime", () => { ); const runtime = yield* buildCloudManagedEndpointRuntime(spawner); - yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token-1", - tunnelId: "tunnel-1", - tunnelName: "t3-code-env-1", - }); - yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token-1", - tunnelId: "tunnel-1", - tunnelName: "t3-code-env-1", - }); - yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token-2", - tunnelId: "tunnel-1", - tunnelName: "t3-code-env-1", - }); - const stopped = yield* runtime.applyConfig(null); + yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token-1", + tunnelId: "tunnel-1", + tunnelName: "t3-code-env-1", + }), + ); + yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token-1", + tunnelId: "tunnel-1", + tunnelName: "t3-code-env-1", + }), + ); + yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token-2", + tunnelId: "tunnel-1", + tunnelName: "t3-code-env-1", + }), + ); + const stopped = yield* runtime.applyConfig(Option.none()); - expect(spawned.map((command) => command.command)).toEqual(["cloudflared", "cloudflared"]); - expect(spawned.map((command) => command.args)).toEqual([ - ["tunnel", "run"], - ["tunnel", "run"], - ]); - expect(spawned.map((command) => command.options.env?.TUNNEL_TOKEN)).toEqual([ - "token-1", - "token-2", - ]); - expect(spawned.map((command) => command.options.stdout)).toEqual(["pipe", "pipe"]); - expect(spawned.map((command) => command.options.stderr)).toEqual(["pipe", "pipe"]); - expect(spawned.map((command) => command.options.detached)).toEqual([false, false]); - expect(spawned.map((command) => command.options.shell)).toEqual([false, false]); - expect(killed).toEqual([100, 101]); - expect(stopped).toEqual({ status: "disabled" }); + assert.deepEqual( + spawned.map((command) => command.command), + ["cloudflared", "cloudflared"], + ); + assert.deepEqual( + spawned.map((command) => command.args), + [ + ["tunnel", "run"], + ["tunnel", "run"], + ], + ); + assert.deepEqual( + spawned.map((command) => command.options.env?.TUNNEL_TOKEN), + ["token-1", "token-2"], + ); + assert.deepEqual( + spawned.map((command) => command.options.stdout), + ["pipe", "pipe"], + ); + assert.deepEqual( + spawned.map((command) => command.options.stderr), + ["pipe", "pipe"], + ); + assert.deepEqual( + spawned.map((command) => command.options.detached), + [false, false], + ); + assert.deepEqual( + spawned.map((command) => command.options.shell), + [false, false], + ); + assert.deepEqual(killed, [100, 101]); + assert.deepEqual(stopped, { status: "disabled" }); }), ); @@ -178,18 +205,22 @@ describe("CloudManagedEndpointRuntime", () => { ); const runtime = yield* buildCloudManagedEndpointRuntime(spawner); - const started = yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token", - }); - const unsupported = yield* runtime.applyConfig({ - providerKind: "manual", - connectorToken: "manual-token", - }); + const started = yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + }), + ); + const unsupported = yield* runtime.applyConfig( + Option.some({ + providerKind: "manual", + connectorToken: "manual-token", + }), + ); - expect(started.status).toBe("running"); - expect(unsupported).toEqual({ status: "unsupported", providerKind: "manual" }); - expect(killed).toEqual([200]); + assert.equal(started.status, "running"); + assert.deepEqual(unsupported, { status: "unsupported", providerKind: "manual" }); + assert.deepEqual(killed, [200]); }), ); @@ -220,14 +251,20 @@ describe("CloudManagedEndpointRuntime", () => { tunnelId: "tunnel-1", }; - const first = yield* runtime.applyConfig(config); + const first = yield* runtime.applyConfig(Option.some(config)); firstRunning = false; - const second = yield* runtime.applyConfig(config); + const second = yield* runtime.applyConfig(Option.some(config)); - expect(first).toMatchObject({ status: "running", pid: 300 }); - expect(second).toMatchObject({ status: "running", pid: 301 }); - expect(spawned).toEqual([300, 301]); - expect(killed).toEqual([300]); + if (first.status !== "running") { + assert.fail(`Expected first connector to be running, got ${first.status}`); + } + assert.equal(first.pid, 300); + if (second.status !== "running") { + assert.fail(`Expected second connector to be running, got ${second.status}`); + } + assert.equal(second.pid, 301); + assert.deepEqual(spawned, [300, 301]); + assert.deepEqual(killed, [300]); }), ); @@ -260,17 +297,22 @@ describe("CloudManagedEndpointRuntime", () => { ); const runtime = yield* buildCloudManagedEndpointRuntime(spawner); - const started = yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token", - tunnelId: "tunnel-1", - }); + const started = yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + tunnelId: "tunnel-1", + }), + ); yield* Deferred.succeed(firstExit, ChildProcessSpawner.ExitCode(1)); yield* Deferred.await(secondSpawned); - expect(started).toMatchObject({ status: "running", pid: 400 }); - expect(spawned).toEqual([400, 401]); - expect(killed).toEqual([400]); + if (started.status !== "running") { + assert.fail(`Expected connector to be running, got ${started.status}`); + } + assert.equal(started.pid, 400); + assert.deepEqual(spawned, [400, 401]); + assert.deepEqual(killed, [400]); }), ); @@ -301,26 +343,33 @@ describe("CloudManagedEndpointRuntime", () => { const runtime = yield* buildCloudManagedEndpointRuntime(spawner); const first = yield* runtime - .applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token-1", - }) + .applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token-1", + }), + ) .pipe(Effect.forkChild); yield* Deferred.await(firstSpawnEntered); const second = yield* runtime - .applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token-2", - }) + .applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token-2", + }), + ) .pipe(Effect.forkChild); yield* Deferred.succeed(releaseFirstSpawn, undefined); yield* Fiber.join(first); const status = yield* Fiber.join(second); - expect(status).toMatchObject({ status: "running", pid: 501 }); - expect(spawned).toEqual([500, 501]); - expect(killed).toEqual([500]); + if (status.status !== "running") { + assert.fail(`Expected connector to be running, got ${status.status}`); + } + assert.equal(status.pid, 501); + assert.deepEqual(spawned, [500, 501]); + assert.deepEqual(killed, [500]); }), ); @@ -338,17 +387,19 @@ describe("CloudManagedEndpointRuntime", () => { ); const runtime = yield* buildCloudManagedEndpointRuntime(spawner); - const status = yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token", - tunnelId: "tunnel-1", - }); + const status = yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + tunnelId: "tunnel-1", + }), + ); - expect(status).toMatchObject({ - status: "failed", - providerKind: "cloudflare_tunnel", - tunnelId: "tunnel-1", - }); + if (status.status !== "failed") { + assert.fail(`Expected connector spawn to fail, got ${status.status}`); + } + assert.equal(status.providerKind, "cloudflare_tunnel"); + assert.equal(status.tunnelId, "tunnel-1"); }), ); @@ -371,17 +422,19 @@ describe("CloudManagedEndpointRuntime", () => { ), ); - const status = yield* runtime.applyConfig({ - providerKind: "cloudflare_tunnel", - connectorToken: "token", - }); + const status = yield* runtime.applyConfig( + Option.some({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + }), + ); - expect(status).toEqual({ + assert.deepEqual(status, { status: "failed", providerKind: "cloudflare_tunnel", reason: "The relay client is not installed.", }); - expect(spawn).not.toHaveBeenCalled(); + assert.equal(spawn.mock.calls.length, 0); }), ); }); diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.ts b/apps/server/src/cloud/ManagedEndpointRuntime.ts index a1d7112a929..a834f507ad1 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.ts @@ -24,9 +24,9 @@ const readRuntimeConfig = Effect.gen(function* () { const secrets = yield* ServerSecretStore.ServerSecretStore; const bytes = yield* secrets.get(CLOUD_ENDPOINT_RUNTIME_CONFIG); if (Option.isNone(bytes)) { - return null; + return Option.none(); } - return Option.getOrNull(decodeRuntimeConfig(bytesToString(bytes.value))); + return decodeRuntimeConfig(bytesToString(bytes.value)); }); export type CloudManagedEndpointRuntimeStatus = @@ -56,7 +56,7 @@ export class CloudManagedEndpointRuntime extends Context.Service< CloudManagedEndpointRuntime, { readonly applyConfig: ( - config: RelayManagedEndpointRuntimeConfig | null, + config: Option.Option, ) => Effect.Effect; } >()("t3/cloud/ManagedEndpointRuntime/CloudManagedEndpointRuntime") {} @@ -84,28 +84,32 @@ function runtimeConfigKey(config: RelayManagedEndpointRuntimeConfig): string { }); } -const stopConnector = (connector: ActiveConnector | null) => - connector - ? Scope.close(connector.scope, Exit.void).pipe( +const stopConnector = (connector: Option.Option) => + Option.match(connector, { + onNone: () => Effect.void, + onSome: (active) => + Scope.close(active.scope, Exit.void).pipe( Effect.tap(() => Effect.logInfo("Relay client stopped", { - pid: Number(connector.child.pid), + pid: Number(active.child.pid), }), ), Effect.ignore, - ) - : Effect.void; + ), + }); export const make = Effect.gen(function* () { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; const relayClient = yield* RelayClient.RelayClient; - const activeRef = yield* Ref.make(null); - const desiredConfigRef = yield* Ref.make(null); + const activeRef = yield* Ref.make>(Option.none()); + const desiredConfigRef = yield* Ref.make>( + Option.none(), + ); const reconcileSemaphore = yield* Semaphore.make(1); let reconcileConfig: CloudManagedEndpointRuntime["Service"]["applyConfig"]; const stopActive = Effect.gen(function* () { - const active = yield* Ref.getAndSet(activeRef, null); + const active = yield* Ref.getAndSet(activeRef, Option.none()); yield* stopConnector(active); }); @@ -116,19 +120,20 @@ export const make = Effect.gen(function* () { Effect.gen(function* () { const active = yield* Ref.get(activeRef); if ( - active?.child.pid !== connector.child.pid || - active.configKey !== connector.configKey + Option.isNone(active) || + active.value.child.pid !== connector.child.pid || + active.value.configKey !== connector.configKey ) { return; } - yield* Ref.set(activeRef, null); - yield* stopConnector(connector); + yield* Ref.set(activeRef, Option.none()); + yield* stopConnector(Option.some(connector)); const desiredConfig = yield* Ref.get(desiredConfigRef); if ( - !desiredConfig || - desiredConfig.providerKind !== "cloudflare_tunnel" || - runtimeConfigKey(desiredConfig) !== connector.configKey + Option.isNone(desiredConfig) || + desiredConfig.value.providerKind !== "cloudflare_tunnel" || + runtimeConfigKey(desiredConfig.value) !== connector.configKey ) { return; } @@ -181,120 +186,130 @@ export const make = Effect.gen(function* () { ), ); - reconcileConfig = Effect.fn("CloudManagedEndpointRuntime.reconcileConfig")(function* (config) { - if (!config || config.providerKind !== "cloudflare_tunnel") { + reconcileConfig = Effect.fn("CloudManagedEndpointRuntime.reconcileConfig")( + function* (configOption) { + if (Option.isNone(configOption)) { + yield* stopActive; + return { status: "disabled" }; + } + + const config = configOption.value; + if (config.providerKind !== "cloudflare_tunnel") { + yield* stopActive; + return { status: "unsupported", providerKind: config.providerKind }; + } + + const nextConfigKey = runtimeConfigKey(config); + const active = yield* Ref.get(activeRef); + if (Option.isSome(active) && active.value.configKey === nextConfigKey) { + const isRunning = yield* active.value.child.isRunning.pipe( + Effect.orElseSucceed(() => false), + ); + if (isRunning) { + return { + status: "running", + providerKind: "cloudflare_tunnel", + pid: Number(active.value.child.pid), + ...(active.value.config.tunnelId ? { tunnelId: active.value.config.tunnelId } : {}), + ...(active.value.config.tunnelName + ? { tunnelName: active.value.config.tunnelName } + : {}), + } satisfies CloudManagedEndpointRuntimeStatus; + } + } + yield* stopActive; - return config - ? { status: "unsupported", providerKind: config.providerKind } - : { status: "disabled" }; - } - const nextConfigKey = runtimeConfigKey(config); - const active = yield* Ref.get(activeRef); - if (active?.configKey === nextConfigKey) { - const isRunning = yield* active.child.isRunning.pipe(Effect.orElseSucceed(() => false)); - if (isRunning) { + const executable = yield* relayClient.resolve; + if (executable.status !== "available") { return { - status: "running", + status: "failed", providerKind: "cloudflare_tunnel", - pid: Number(active.child.pid), - ...(active.config.tunnelId ? { tunnelId: active.config.tunnelId } : {}), - ...(active.config.tunnelName ? { tunnelName: active.config.tunnelName } : {}), + reason: + executable.status === "unsupported" + ? `Relay client is unsupported on ${executable.platform}-${executable.arch}.` + : "The relay client is not installed.", + ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), + ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), } satisfies CloudManagedEndpointRuntimeStatus; } - } - yield* stopActive; - - const executable = yield* relayClient.resolve; - if (executable.status !== "available") { - return { - status: "failed", - providerKind: "cloudflare_tunnel", - reason: - executable.status === "unsupported" - ? `Relay client is unsupported on ${executable.platform}-${executable.arch}.` - : "The relay client is not installed.", - ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), - ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), - } satisfies CloudManagedEndpointRuntimeStatus; - } - - const connectorScope = yield* Scope.make("sequential"); - const child = yield* spawner - .spawn( - ChildProcess.make(executable.executablePath, ["tunnel", "run"], { - detached: false, - env: { - ...process.env, - TUNNEL_TOKEN: config.connectorToken, - }, - shell: false, - stderr: "pipe", - stdout: "pipe", - }), - ) - .pipe( - Effect.provideService(Scope.Scope, connectorScope), - Effect.tap((child) => - Effect.logInfo("Relay client process started; waiting for tunnel connection", { - pid: Number(child.pid), - tunnelId: config.tunnelId, - tunnelName: config.tunnelName, + const connectorScope = yield* Scope.make("sequential"); + const child = yield* spawner + .spawn( + ChildProcess.make(executable.executablePath, ["tunnel", "run"], { + detached: false, + env: { + ...process.env, + TUNNEL_TOKEN: config.connectorToken, + }, + shell: false, + stderr: "pipe", + stdout: "pipe", }), - ), - Effect.catch((cause) => - Effect.logWarning("Failed to start relay client", { - cause, - tunnelId: config.tunnelId, - tunnelName: config.tunnelName, - }).pipe( - Effect.andThen(Scope.close(connectorScope, Exit.void).pipe(Effect.ignore)), - Effect.as({ - status: "failed", - providerKind: "cloudflare_tunnel", - reason: String(cause), - ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), - ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), - } satisfies CloudManagedEndpointRuntimeStatus), + ) + .pipe( + Effect.provideService(Scope.Scope, connectorScope), + Effect.tap((child) => + Effect.logInfo("Relay client process started; waiting for tunnel connection", { + pid: Number(child.pid), + tunnelId: config.tunnelId, + tunnelName: config.tunnelName, + }), ), - ), - ); + Effect.catch((cause) => + Effect.logWarning("Failed to start relay client", { + cause, + tunnelId: config.tunnelId, + tunnelName: config.tunnelName, + }).pipe( + Effect.andThen(Scope.close(connectorScope, Exit.void).pipe(Effect.ignore)), + Effect.as({ + status: "failed", + providerKind: "cloudflare_tunnel", + reason: String(cause), + ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), + ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), + } satisfies CloudManagedEndpointRuntimeStatus), + ), + ), + ); - if ("status" in child && child.status === "failed") { - return child; - } + if ("status" in child && child.status === "failed") { + return child; + } + + if (!("status" in child)) { + const connector = { + child, + scope: connectorScope, + configKey: nextConfigKey, + config, + } satisfies ActiveConnector; + yield* Ref.set(activeRef, Option.some(connector)); + yield* Effect.forkIn(observeConnectorOutput(connector), connectorScope); + yield* Effect.forkIn(superviseConnector(connector), connectorScope); + return { + status: "running", + providerKind: "cloudflare_tunnel", + pid: Number(child.pid), + ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), + ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), + } satisfies CloudManagedEndpointRuntimeStatus; + } - if (!("status" in child)) { - const connector = { - child, - scope: connectorScope, - configKey: nextConfigKey, - config, - } satisfies ActiveConnector; - yield* Ref.set(activeRef, connector); - yield* Effect.forkIn(observeConnectorOutput(connector), connectorScope); - yield* Effect.forkIn(superviseConnector(connector), connectorScope); return { - status: "running", + status: "failed", providerKind: "cloudflare_tunnel", - pid: Number(child.pid), + reason: "Relay client did not start.", ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), } satisfies CloudManagedEndpointRuntimeStatus; - } - - return { - status: "failed", - providerKind: "cloudflare_tunnel", - reason: "Relay client did not start.", - ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), - ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), - } satisfies CloudManagedEndpointRuntimeStatus; - }); + }, + ); const applyConfig = Effect.fn("CloudManagedEndpointRuntime.applyConfig")( - (config: RelayManagedEndpointRuntimeConfig | null) => + (config: Option.Option) => reconcileSemaphore.withPermits(1)( Ref.set(desiredConfigRef, config).pipe(Effect.andThen(reconcileConfig(config))), ), @@ -307,12 +322,12 @@ export const make = Effect.gen(function* () { const initialConfig = yield* readRuntimeConfig.pipe( Effect.catch((cause) => Effect.logWarning("Failed to read managed endpoint runtime config", { cause }).pipe( - Effect.as(null), + Effect.as(Option.none()), ), ), ); yield* runtime.applyConfig(initialConfig); - yield* Effect.addFinalizer(() => runtime.applyConfig(null)); + yield* Effect.addFinalizer(() => runtime.applyConfig(Option.none())); return runtime; }); diff --git a/apps/server/src/cloud/http.ts b/apps/server/src/cloud/http.ts index fc2adca9fbc..892fe31237e 100644 --- a/apps/server/src/cloud/http.ts +++ b/apps/server/src/cloud/http.ts @@ -433,7 +433,7 @@ const applyCloudRelayConfig = Effect.fn("environment.cloud.applyRelayConfig")(fu }); yield* validateCloudMintPublicKey(payload.cloudMintPublicKey); const endpointRuntimeStatus = yield* dependencies.endpointRuntime.applyConfig( - payload.endpointRuntime, + Option.fromNullOr(payload.endpointRuntime), ); const ok = endpointRuntimeStatus.status === "disabled" || endpointRuntimeStatus.status === "running"; @@ -642,7 +642,7 @@ const cloudLinkStateHandler = Effect.fn("environment.cloud.linkState")( const cloudUnlinkHandler = Effect.fn("environment.cloud.unlink")( function* (dependencies: CloudHttpDependencies) { yield* requireEnvironmentScope(AuthRelayWriteScope); - const endpointRuntimeStatus = yield* dependencies.endpointRuntime.applyConfig(null); + const endpointRuntimeStatus = yield* dependencies.endpointRuntime.applyConfig(Option.none()); yield* Effect.all( [ dependencies.secrets.remove(CLOUD_LINKED_USER_ID), diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index e1daf20ed57..6efc88f7d44 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -2253,16 +2253,17 @@ it.layer(NodeServices.layer)("server router seam", (it) => { layers: { cloudManagedEndpointRuntime: { applyConfig: (config) => { - appliedRuntimeConfigs.push(config); - if (!config) { + appliedRuntimeConfigs.push(Option.getOrNull(config)); + if (Option.isNone(config)) { return Effect.succeed({ status: "disabled" }); } + const runtimeConfig = config.value; return Effect.succeed({ status: "running", providerKind: "cloudflare_tunnel", pid: 123, - ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), - ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), + ...(runtimeConfig.tunnelId ? { tunnelId: runtimeConfig.tunnelId } : {}), + ...(runtimeConfig.tunnelName ? { tunnelName: runtimeConfig.tunnelName } : {}), }); }, },