Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.retry.RetryStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

/** Tests that nested step calling is properly rejected. */
Expand All @@ -16,9 +18,13 @@ void nestedStepCallingThrowsIllegalStateException() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
// outer-step's supplier calls context.step() which internally calls stepAsync().get()
// The get() is called from the outer step's thread (named "1-step"), triggering the check
var future = context.stepAsync("outer-step", String.class, stepCtx -> {
return context.step("inner-step", String.class, stepCtx2 -> "inner-result");
});
var future = context.stepAsync(
"outer-step",
String.class,
stepCtx -> context.step("inner-step", String.class, stepCtx2 -> "inner-result"),
StepConfig.builder()
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
return future.get();
});

Expand All @@ -38,10 +44,16 @@ void awaitingAsyncStepInsideSyncStepThrowsIllegalStateException() {
var asyncFuture = context.stepAsync("async-step", String.class, stepCtx -> "async-result");

// Sync step tries to await the async step's result inside its supplier
return context.step("sync-step", String.class, stepCtx -> {
// This get() is called from sync-step's thread ("2-step"), which is not allowed
return "combined: " + asyncFuture.get();
});
return context.step(
"sync-step",
String.class,
stepCtx -> {
// This get() is called from sync-step's thread ("2-step"), which is not allowed
return "combined: " + asyncFuture.get();
},
StepConfig.builder()
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
});

var result = runner.run("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,15 @@ protected List<Operation> getChildOperations() {
/**
* Checks if it's called from a Step.
*
* @throws IllegalDurableOperationException if it's in a step
* @throws IllegalStateException if it's in a step
*/
private void validateCurrentThreadType() {
ThreadType current = getCurrentThreadContext().threadType();
if (current == ThreadType.STEP) {
var message = String.format(
"Nested %s operation is not supported on %s from within a %s execution.",
getType(), getName(), current);
// terminate execution and throw the exception
throw terminateExecutionWithIllegalDurableOperationException(message);
throw new IllegalStateException(message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,31 @@ public String get() {
verify(executionManager).terminateExecution(any(IllegalDurableOperationException.class));
}

@Test
void waitForOperationCompletionThrowsIllegalStateExceptionWhenCalledFromStepThread() {
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext(CONTEXT_ID, ThreadType.STEP));

SerializableDurableOperation<String> op =
new SerializableDurableOperation<>(OPERATION_IDENTIFIER, RESULT_TYPE, SER_DES, durableContext) {
@Override
protected void start() {
markAlreadyCompleted();
assertThrows(IllegalStateException.class, this::waitForOperationCompletion);
}

@Override
protected void replay(Operation existing) {}

@Override
public String get() {
return RESULT;
}
};

op.execute();
verify(executionManager, never()).terminateExecution(any(IllegalDurableOperationException.class));
}

@Test
void waitForOperationCompletionWhenRunningAndReadyToComplete()
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down