Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/adr/004-child-context-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Inner operation IDs are prefixed with the parent context's operation ID using `-

### Per-context replay state

A global `executionMode` doesn't work for child contexts — a child may be replaying while the parent is already executing. Each `DurableContext` tracks its own replay state via an `isReplaying` field, initialized by checking `ExecutionManager.hasOperationsForContext(contextId)`.
A global `executionMode` doesn't work for child contexts — a child may be replaying while the parent is already executing. Each `DurableContext` tracks its own replay state via an `isReplaying` field, initialized by checking `ExecutionManager.hasOperationsForContext(contextId)`. `StepContext` does not track replay state because steps are retried by attempt, not replayed as independent contexts.

### Thread model

Expand Down
3 changes: 1 addition & 2 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ SuspendExecutionException # Internal: triggers suspension (not

Terminal states (SUCCEEDED, FAILED, CANCELLED, TIMED_OUT, STOPPED) stay in REPLAY mode since we're just returning cached results.

This is a one-way transition (REPLAY → EXECUTION, never back). `DurableLogger` checks `isReplaying()` to suppress duplicate logs during replay.
This is a one-way transition (REPLAY → EXECUTION, never back). `DurableLogger` checks `DurableContext.isReplaying()` to suppress duplicate logs during replay; `StepContext` logs are attempt-based and are never replay-suppressed.

### MDC-Based Context Enrichment

Expand Down Expand Up @@ -896,4 +896,3 @@ var result = stepFuture.get();
| 6 | `wait()` returns. `stepFuture.get()` → result already available. | — | — |

If the wait duration hasn't elapsed when the step completes, the execution is suspended. If the step finishes *after* the wait, the step thread keeps the execution alive (prevents suspension) while the wait polls to completion.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ static DurableContext getCurrentContext() {
return (DurableContext) BaseContext.getCurrentContext();
}

/** Returns whether this context is currently replaying checkpointed durable operations. */
boolean isReplaying();

/**
* Executes a durable step with the given name and blocks until it completes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,4 @@ static BaseContext getCurrentContext() {

/** Gets the context name for this context. Null for root context. */
String getContextName();

/** Returns whether this context is currently in replay mode. */
boolean isReplaying();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ public abstract class BaseContextImpl implements BaseContext {
private final String contextName;
private final ThreadType threadType;

private boolean isReplaying;

/**
* Creates a new BaseContext instance.
*
Expand All @@ -41,7 +39,6 @@ protected BaseContextImpl(
this.lambdaContext = lambdaContext;
this.contextId = contextId;
this.contextName = contextName;
this.isReplaying = executionManager.hasOperationsForContext(contextId);
this.threadType = threadType;
}

Expand Down Expand Up @@ -99,19 +96,6 @@ public ExecutionManager getExecutionManager() {
return executionManager;
}

/** Returns whether this context is currently in replay mode. */
@Override
public boolean isReplaying() {
return isReplaying;
}

/**
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
*/
public void setExecutionMode() {
this.isReplaying = false;
}

/** Returns a durable logger for this context. */
public DurableLogger getLogger() {
return DurableLogger.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DurableContextImpl extends BaseContextImpl implements DurableContex
private final OperationIdGenerator operationIdGenerator;
private final DurableContextImpl parentContext;
private final boolean isVirtual;
private boolean isReplaying;

/** Shared initialization — sets all fields. */
private DurableContextImpl(
Expand All @@ -74,6 +75,7 @@ private DurableContextImpl(
operationIdGenerator = new OperationIdGenerator(contextId);
this.parentContext = parentContext;
this.isVirtual = isVirtual;
this.isReplaying = executionManager.hasOperationsForContext(contextId);
}

/**
Expand Down Expand Up @@ -437,6 +439,19 @@ private String nextOperationId() {
return operationIdGenerator.nextOperationId();
}

/** Returns whether this context is currently in replay mode. */
@Override
public boolean isReplaying() {
return isReplaying;
}

/**
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
*/
public void setExecutionMode() {
this.isReplaying = false;
}

/**
* Get the parent context ID for its child operations, which always points to a non-virtual context
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
/**
* Context available inside a step operation's user function.
*
* <p>Provides access to the current retry attempt number and a logger that includes execution metadata. Extends
* {@link BaseContext} for thread lifecycle management.
* <p>Provides access to the current retry attempt number and a logger that includes execution metadata. Steps are
* retried by attempt rather than replayed, so this context does not track replay state.
*/
public class StepContextImpl extends BaseContextImpl implements StepContext {
private final int attempt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ public void error(String message, Throwable t) {
}

private boolean shouldSuppress(BaseContext context) {
return context.getDurableConfig().getLoggerConfig().suppressReplayLogs() && context.isReplaying();
return context instanceof DurableContext durableContext
&& context.getDurableConfig().getLoggerConfig().suppressReplayLogs()
&& durableContext.isReplaying();
}

private void log(Runnable logAction) {
var threadLocalContext = BaseContext.getCurrentContext();
if (threadLocalContext == null || !shouldSuppress(threadLocalContext)) {
if (!shouldSuppress(threadLocalContext)) {
logAction.run();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ void setsExecutionMdcOnFirstLog() {
@Test
void setStepThreadPropertiesSetsMdc() {
var logger = new DurableLogger(new RecordingLogger().delegate());
var replaying = new AtomicBoolean(false);

BaseContextImpl.setCurrentContext(
createStepContext(replaying, LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2));
createStepContext(LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2));
DurableLogger.attachContext();
try {
logger.info("step log");
Expand Down Expand Up @@ -145,6 +144,20 @@ void clearThreadPropertiesRemovesMdc() {
assertNull(MDC.get(DurableLogger.MDC_REQUEST_ID));
}

@Test
void stepLogsAreNotSuppressed() {
var recordingLogger = new RecordingLogger();
var logger = new DurableLogger(recordingLogger.delegate());

withContext(createStepContext(LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2), () -> {
logger.info("step logs should always emit");
});

assertEquals(1, recordingLogger.calls().size());
assertEquals(
"step logs should always emit", recordingLogger.calls().get(0).message());
}

@Test
void replayModeTransitionAllowsSubsequentLogs() {
var recordingLogger = new RecordingLogger();
Expand Down Expand Up @@ -223,12 +236,7 @@ private static DurableContext createDurableContext(
}

private static StepContext createStepContext(
AtomicBoolean replaying,
LoggerConfig loggerConfig,
String requestId,
String operationId,
String operationName,
int attempt) {
LoggerConfig loggerConfig, String requestId, String operationId, String operationName, int attempt) {
return (StepContext) Proxy.newProxyInstance(
StepContext.class.getClassLoader(),
new Class<?>[] {StepContext.class},
Expand All @@ -239,7 +247,6 @@ private static StepContext createStepContext(
case "getContextId" -> operationId;
case "getContextName" -> operationName;
case "getAttempt" -> attempt;
case "isReplaying" -> replaying.get();
case "toString" -> "TestStepContext";
case "hashCode" -> System.identityHashCode(proxy);
case "equals" -> proxy == args[0];
Expand Down
Loading