Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 52 additions & 64 deletions apps/server/integration/OrchestrationEngineHarness.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type OrchestrationEvent,
type OrchestrationThread,
} from "@t3tools/contracts";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Exit from "effect/Exit";
import * as FileSystem from "effect/FileSystem";
Expand Down Expand Up @@ -120,39 +121,45 @@ class WaitForTimeoutError extends Schema.TaggedErrorClass<WaitForTimeoutError>()
},
) {}

class WaitForRetrySignal extends Schema.TaggedErrorClass<WaitForRetrySignal>()(
"WaitForRetrySignal",
{
description: Schema.String,
},
) {}

const isWaitForRetrySignal = Schema.is(WaitForRetrySignal);
const WAIT_FOR_RETRY_INTERVAL = Duration.millis(10);
const WAIT_FOR_DEFAULT_TIMEOUT = Duration.seconds(40);

function waitFor<A, E>(
read: Effect.Effect<A, E>,
predicate: (value: A) => boolean,
description: string,
timeoutMs?: number,
timeout?: Duration.Input,
): Effect.Effect<A, never>;
function waitFor<A, B extends A, E>(
read: Effect.Effect<A, E>,
predicate: (value: A) => value is B,
description: string,
timeoutMs?: number,
timeout?: Duration.Input,
): Effect.Effect<B, never>;
function waitFor<A, E>(
read: Effect.Effect<A, E>,
predicate: (value: A) => boolean,
description: string,
timeoutMs = 40_000,
timeout: Duration.Input = WAIT_FOR_DEFAULT_TIMEOUT,
): Effect.Effect<A, never> {
const RETRY_SIGNAL = "wait_for_retry";
const retryIntervalMs = 10;
const maxRetries = Math.max(0, Math.floor(timeoutMs / retryIntervalMs));
const retrySchedule = Schedule.spaced(`${retryIntervalMs} millis`);

return read.pipe(
Effect.filterOrFail(predicate, () => RETRY_SIGNAL),
Effect.filterOrFail(predicate, () => new WaitForRetrySignal({ description })),
Effect.retry({
schedule: retrySchedule,
times: maxRetries,
while: (error) => error === RETRY_SIGNAL,
schedule: Schedule.spaced(WAIT_FOR_RETRY_INTERVAL),
while: isWaitForRetrySignal,
}),
Effect.timeoutOrElse({
duration: timeout,
orElse: () => Effect.fail(new WaitForTimeoutError({ description })),
}),
Effect.mapError((error) =>
error === RETRY_SIGNAL ? new WaitForTimeoutError({ description }) : error,
),
Effect.orDie,
);
}
Expand Down Expand Up @@ -185,11 +192,11 @@ export interface OrchestrationIntegrationHarness {
readonly waitForThread: (
threadId: string,
predicate: (thread: OrchestrationThread) => boolean,
timeoutMs?: number,
timeout?: Duration.Input,
) => Effect.Effect<OrchestrationThread, never>;
readonly waitForDomainEvent: (
predicate: (event: OrchestrationEvent) => boolean,
timeoutMs?: number,
timeout?: Duration.Input,
) => Effect.Effect<ReadonlyArray<OrchestrationEvent>, never>;
readonly waitForPendingApproval: (
requestId: string,
Expand All @@ -198,7 +205,7 @@ export interface OrchestrationIntegrationHarness {
readonly decision: "accept" | "acceptForSession" | "decline" | "cancel" | null;
readonly resolvedAt: string | null;
}) => boolean,
timeoutMs?: number,
timeout?: Duration.Input,
) => Effect.Effect<
{
readonly status: "pending" | "resolved";
Expand All @@ -210,11 +217,11 @@ export interface OrchestrationIntegrationHarness {
readonly waitForReceipt: {
(
predicate: (receipt: OrchestrationRuntimeReceipt) => boolean,
timeoutMs?: number,
timeout?: Duration.Input,
): Effect.Effect<OrchestrationRuntimeReceipt, never>;
<Receipt extends OrchestrationRuntimeReceipt>(
predicate: (receipt: OrchestrationRuntimeReceipt) => receipt is Receipt,
timeoutMs?: number,
timeout?: Duration.Input,
): Effect.Effect<Receipt, never>;
};
readonly dispose: Effect.Effect<void, never>;
Expand Down Expand Up @@ -420,98 +427,79 @@ export const makeOrchestrationIntegrationHarness = (
yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) =>
Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid),
).pipe(Effect.forkIn(scope));
yield* Effect.sleep(10);
yield* Effect.sleep(Duration.millis(10));

const waitForThread: OrchestrationIntegrationHarness["waitForThread"] = (
threadId,
predicate,
timeoutMs,
timeout,
) =>
waitFor(
snapshotQuery
.getSnapshot()
.pipe(
Effect.map(
(snapshot) => snapshot.threads.find((thread) => thread.id === threadId) ?? null,
Effect.map((snapshot) =>
Option.fromUndefinedOr(snapshot.threads.find((thread) => thread.id === threadId)),
),
),
(thread): thread is OrchestrationThread => thread !== null && predicate(thread),
(thread) => Option.isSome(thread) && predicate(thread.value),
`projected thread '${threadId}'`,
timeoutMs,
) as Effect.Effect<OrchestrationThread, never>;
timeout,
).pipe(Effect.map(Option.getOrThrow));

const waitForDomainEvent: OrchestrationIntegrationHarness["waitForDomainEvent"] = (
predicate,
timeoutMs,
timeout,
) =>
waitFor(
Stream.runCollect(engine.readEvents(0)).pipe(
Effect.map((chunk): ReadonlyArray<OrchestrationEvent> => Array.from(chunk)),
),
(events) => events.some(predicate),
"domain event",
timeoutMs,
timeout,
);

const waitForPendingApproval: OrchestrationIntegrationHarness["waitForPendingApproval"] = (
requestId,
predicate,
timeoutMs,
timeout,
) =>
waitFor(
pendingApprovalRepository
.getByRequestId({ requestId: ApprovalRequestId.make(requestId) })
.pipe(
Effect.map((row) =>
Option.match(row, {
onNone: () => null,
onSome: (value) => ({
status: value.status,
decision: value.decision,
resolvedAt: value.resolvedAt,
}),
}),
Effect.map(
Option.map((value) => ({
status: value.status,
decision: value.decision,
resolvedAt: value.resolvedAt,
})),
),
),
(
row,
): row is {
readonly status: "pending" | "resolved";
readonly decision: "accept" | "acceptForSession" | "decline" | "cancel" | null;
readonly resolvedAt: string | null;
} => row !== null && predicate(row),
(row) => Option.isSome(row) && predicate(row.value),
`pending approval '${requestId}'`,
timeoutMs,
) as Effect.Effect<
{
readonly status: "pending" | "resolved";
readonly decision: "accept" | "acceptForSession" | "decline" | "cancel" | null;
readonly resolvedAt: string | null;
},
never
>;
timeout,
).pipe(Effect.map(Option.getOrThrow));

function waitForReceipt(
predicate: (receipt: OrchestrationRuntimeReceipt) => boolean,
timeoutMs?: number,
timeout?: Duration.Input,
): Effect.Effect<OrchestrationRuntimeReceipt, never>;
function waitForReceipt<Receipt extends OrchestrationRuntimeReceipt>(
predicate: (receipt: OrchestrationRuntimeReceipt) => receipt is Receipt,
timeoutMs?: number,
timeout?: Duration.Input,
): Effect.Effect<Receipt, never>;
function waitForReceipt(
predicate: (receipt: OrchestrationRuntimeReceipt) => boolean,
timeoutMs?: number,
timeout?: Duration.Input,
) {
const readMatchingReceipt = Ref.get(receiptHistory).pipe(
Effect.map((history) => history.find(predicate)),
Effect.map((history) => Option.fromUndefinedOr(history.find(predicate))),
);

return waitFor(
readMatchingReceipt,
(receipt): receipt is OrchestrationRuntimeReceipt => receipt !== undefined,
"runtime receipt",
timeoutMs,
return waitFor(readMatchingReceipt, Option.isSome, "runtime receipt", timeout).pipe(
Effect.map(Option.getOrThrow),
);
}

Expand Down
Loading
Loading