diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java new file mode 100644 index 000000000..e09e16d11 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java @@ -0,0 +1,44 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.otel; + +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; + +/** + * OTel + X-Ray example: map operation that processes items concurrently. + * + *
Exercises map operation tracing — verifies spans for map operation + each item step.
+ */
+public class OtelXRayMapExample extends DurableHandler Exercises nested context tracing — verifies span hierarchy for outer → inner → step.
+ */
+public class OtelXRayNestedContextExample extends DurableHandler Exercises parallel operation tracing — verifies spans for parallel + branch steps.
+ */
+public class OtelXRayParallelExample extends DurableHandler These verify that the OTel plugin doesn't break execution for various scenarios. When deployed to Lambda with
+ * CloudDurableTestRunner, these same scenarios validate that flush/export works correctly under real concurrency.
+ */
+class OtelXRayCloudTest {
+
+ @Test
+ void mapExample_executesSuccessfully() {
+ var handler = new OtelXRayMapExample();
+ var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
+
+ var result = runner.runUntilComplete(new GreetingRequest("test"));
+
+ assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
+ assertEquals("Mapped 3 items", result.getResult(String.class));
+ }
+
+ @Test
+ void parallelExample_executesSuccessfully() {
+ var handler = new OtelXRayParallelExample();
+ var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
+
+ var result = runner.runUntilComplete(new GreetingRequest("test"));
+
+ assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
+ assertTrue(result.getResult(String.class).contains("Parallel completed: 2 branches"));
+ }
+
+ @Test
+ void nestedContextExample_executesSuccessfully() {
+ var handler = new OtelXRayNestedContextExample();
+ var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
+
+ var result = runner.runUntilComplete(new GreetingRequest("World"));
+
+ assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
+ assertEquals("HELLO, WORLD!", result.getResult(String.class));
+ }
+}
diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java
index af7e10562..3e71e82ef 100644
--- a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java
+++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java
@@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.plugin.DurableExecutionPlugin;
import software.amazon.lambda.durable.plugin.InvocationEndInfo;
import software.amazon.lambda.durable.plugin.InvocationInfo;
@@ -258,16 +259,45 @@ public void onOperationStart(OperationInfo info) {
public void onOperationEnd(OperationEndInfo info) {
if (info.id() == null) return;
- // End the operation span that was started in onOperationStart
var span = operationSpans.remove(info.id());
- if (span == null) return;
- if (info.error() != null) {
- span.setStatus(StatusCode.ERROR, info.error().getMessage());
- span.recordException(info.error());
- }
+ if (span != null) {
+ // Operation was started in this invocation — end normally
+ if (info.error() != null) {
+ span.setStatus(StatusCode.ERROR, info.error().getMessage());
+ span.recordException(info.error());
+ }
+ span.end();
+ } else {
+ // Operation was started in a prior invocation — create a continuation span with Link
+ // to the deterministic span ID from the original invocation.
+ var deterministicSpanId = idGenerator.generateSpanIdForOperation(info.id());
+ var traceId = idGenerator.generateTraceId();
+ var linkedSpanContext =
+ SpanContext.create(traceId, deterministicSpanId, TraceFlags.getSampled(), TraceState.getDefault());
+
+ var parentContext = resolveParentContext(info.parentId());
+
+ var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name()))
+ .setParent(parentContext)
+ .addLink(linkedSpanContext)
+ .setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn)
+ .setAttribute(DURABLE_OPERATION_ID, info.id())
+ .setAttribute(DURABLE_OPERATION_TYPE, info.type());
- span.end();
+ if (info.name() != null) {
+ spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
+ }
+
+ var continuationSpan = spanBuilder.startSpan();
+
+ if (info.error() != null) {
+ continuationSpan.setStatus(StatusCode.ERROR, info.error().getMessage());
+ continuationSpan.recordException(info.error());
+ }
+
+ continuationSpan.end();
+ }
}
// ─── User function hooks ─────────────────────────────────────────────
@@ -327,8 +357,12 @@ public void onUserFunctionEnd(UserFunctionEndInfo info) {
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, outcome);
if (!info.succeeded() && info.error() != null) {
- span.setStatus(StatusCode.ERROR, info.error().getMessage());
- span.recordException(info.error());
+ if (info.error() instanceof SuspendExecutionException) {
+ span.setAttribute(DURABLE_ATTEMPT_OUTCOME, "pending");
+ } else {
+ span.setStatus(StatusCode.ERROR, info.error().getMessage());
+ span.recordException(info.error());
+ }
}
if (info.endTimestamp() != null) {
diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java
index b3a2fe4bb..ca143c570 100644
--- a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java
+++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java
@@ -11,6 +11,7 @@
import java.time.Instant;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.plugin.*;
class OpenTelemetryDurablePluginTest {
@@ -417,4 +418,195 @@ void xrayExtraction_extractedTraceIdMatchesXrayConversion() {
var spans = spanExporter.getFinishedSpanItems();
assertEquals(expectedOtelTraceId, spans.get(0).getTraceId());
}
+
+ // ─── Cross-invocation continuation span tests ────────────────────────
+
+ @Test
+ void operationEnd_withoutMatchingStart_createsContinuationSpanWithLink() {
+ plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true));
+
+ // onOperationEnd without a prior onOperationStart — operation completed between invocations
+ plugin.onOperationEnd(
+ new OperationEndInfo("op-wait-1", "my-wait", "WAIT", "Wait", null, Instant.now(), Instant.now(), null));
+
+ plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null));
+
+ var spans = spanExporter.getFinishedSpanItems();
+ assertEquals(2, spans.size());
+
+ var continuationSpan = spans.stream()
+ .filter(s -> s.getName().contains("wait"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals("durable.wait:my-wait", continuationSpan.getName());
+ assertFalse(continuationSpan.getLinks().isEmpty(), "Continuation span should have a Link");
+ }
+
+ @Test
+ void operationEnd_withoutMatchingStart_withError_setsErrorStatus() {
+ plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true));
+
+ plugin.onOperationEnd(new OperationEndInfo(
+ "op-cb-1",
+ "my-callback",
+ "CALLBACK",
+ "Callback",
+ null,
+ Instant.now(),
+ Instant.now(),
+ new RuntimeException("timed out")));
+
+ plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null));
+
+ var continuationSpan = spanExporter.getFinishedSpanItems().stream()
+ .filter(s -> s.getName().contains("callback"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(StatusCode.ERROR, continuationSpan.getStatus().getStatusCode());
+ assertFalse(continuationSpan.getLinks().isEmpty());
+ }
+
+ // ─── SuspendExecutionException handling ──────────────────────────────
+
+ @Test
+ void userFunctionEnd_withSuspendException_setsOutcomePending() {
+ plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true));
+
+ plugin.onUserFunctionStart(new UserFunctionStartInfo(
+ "op-1", "child-ctx", "CONTEXT", "RunInChildContext", null, Instant.now(), false, null));
+
+ plugin.onUserFunctionEnd(new UserFunctionEndInfo(
+ "op-1",
+ "child-ctx",
+ "CONTEXT",
+ "RunInChildContext",
+ null,
+ Instant.now(),
+ Instant.now(),
+ false,
+ null,
+ false,
+ new SuspendExecutionException()));
+
+ plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.PENDING, null));
+
+ var attemptSpan = spanExporter.getFinishedSpanItems().stream()
+ .filter(s -> s.getName().contains("child-ctx"))
+ .findFirst()
+ .orElseThrow();
+ // Should NOT have ERROR status — suspension is not an error
+ assertNotEquals(StatusCode.ERROR, attemptSpan.getStatus().getStatusCode());
+ }
+
+ // ─── Attempt span cleanup at invocation end ──────────────────────────
+
+ @Test
+ void attemptSpan_endedAtInvocationEnd_whenUserFunctionEndNotCalled() {
+ plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true));
+
+ // Start attempt but never call onUserFunctionEnd (simulates crash before end hook)
+ plugin.onUserFunctionStart(
+ new UserFunctionStartInfo("op-1", "running", "STEP", "Step", null, Instant.now(), false, 1));
+
+ // Invocation ends — attempt span should be cleaned up
+ plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.PENDING, null));
+
+ var spans = spanExporter.getFinishedSpanItems();
+ var attemptSpan = spans.stream()
+ .filter(s -> s.getName().contains("running"))
+ .findFirst()
+ .orElseThrow();
+ assertNotNull(attemptSpan, "Attempt span should be exported even without onUserFunctionEnd");
+ }
+
+ // ─── Parent resolution with parentId ─────────────────────────────────
+
+ @Test
+ void childOperation_parentedToParentOperationSpan() {
+ plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true));
+
+ // Parent context operation
+ plugin.onOperationStart(new OperationInfo(
+ "op-parent", "my-context", "CONTEXT", "RunInChildContext", null, Instant.now(), null));
+
+ // Child operation with parentId pointing to parent
+ plugin.onOperationStart(
+ new OperationInfo("op-child", "inner-step", "STEP", "Step", "op-parent", Instant.now(), null));
+ plugin.onOperationEnd(new OperationEndInfo(
+ "op-child", "inner-step", "STEP", "Step", "op-parent", Instant.now(), Instant.now(), null));
+
+ plugin.onOperationEnd(new OperationEndInfo(
+ "op-parent", "my-context", "CONTEXT", "RunInChildContext", null, Instant.now(), Instant.now(), null));
+
+ plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null));
+
+ var spans = spanExporter.getFinishedSpanItems();
+
+ var parentSpan = spans.stream()
+ .filter(s -> s.getName().contains("context"))
+ .findFirst()
+ .orElseThrow();
+ var childSpan = spans.stream()
+ .filter(s -> s.getName().contains("inner-step"))
+ .findFirst()
+ .orElseThrow();
+
+ assertEquals(
+ parentSpan.getSpanId(),
+ childSpan.getParentSpanId(),
+ "Child operation should be parented to parent operation span");
+ }
+
+ // ─── Multi-invocation step-wait-step scenario ────────────────────────
+
+ @Test
+ void multiInvocation_stepWaitStep_producesCorrectSpans() {
+ var arn = "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1";
+
+ // Invocation 1: step completes, wait starts
+ plugin.onInvocationStart(new InvocationInfo("req-1", arn, true));
+ plugin.onOperationStart(new OperationInfo("op-1", "step-A", "STEP", "Step", null, Instant.now(), null));
+ plugin.onUserFunctionStart(
+ new UserFunctionStartInfo("op-1", "step-A", "STEP", "Step", null, Instant.now(), false, 1));
+ plugin.onUserFunctionEnd(new UserFunctionEndInfo(
+ "op-1", "step-A", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null));
+ plugin.onOperationEnd(
+ new OperationEndInfo("op-1", "step-A", "STEP", "Step", null, Instant.now(), Instant.now(), null));
+ plugin.onOperationStart(new OperationInfo("op-2", "pause", "WAIT", "Wait", null, Instant.now(), null));
+ plugin.onInvocationEnd(new InvocationEndInfo("req-1", arn, true, InvocationStatus.PENDING, null));
+
+ // Invocation 1 should have: step op + step attempt + wait (PENDING) + invocation = 4
+ assertEquals(4, spanExporter.getFinishedSpanItems().size());
+ var inv1TraceId = spanExporter.getFinishedSpanItems().get(0).getTraceId();
+
+ spanExporter.reset();
+
+ // Invocation 2: wait completed between invocations, new step runs
+ plugin.onInvocationStart(new InvocationInfo("req-2", arn, false));
+ plugin.onOperationEnd(
+ new OperationEndInfo("op-2", "pause", "WAIT", "Wait", null, Instant.now(), Instant.now(), null));
+ plugin.onOperationStart(new OperationInfo("op-3", "step-B", "STEP", "Step", null, Instant.now(), null));
+ plugin.onUserFunctionStart(
+ new UserFunctionStartInfo("op-3", "step-B", "STEP", "Step", null, Instant.now(), false, 1));
+ plugin.onUserFunctionEnd(new UserFunctionEndInfo(
+ "op-3", "step-B", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null));
+ plugin.onOperationEnd(
+ new OperationEndInfo("op-3", "step-B", "STEP", "Step", null, Instant.now(), Instant.now(), null));
+ plugin.onInvocationEnd(new InvocationEndInfo("req-2", arn, false, InvocationStatus.SUCCEEDED, null));
+
+ var inv2Spans = spanExporter.getFinishedSpanItems();
+ // wait continuation + step-B op + step-B attempt + invocation = 4
+ assertEquals(4, inv2Spans.size());
+
+ // Same trace ID across invocations
+ var inv2TraceId = inv2Spans.get(0).getTraceId();
+ assertEquals(inv1TraceId, inv2TraceId);
+
+ // Wait continuation should have a Link
+ var waitContinuation = inv2Spans.stream()
+ .filter(s -> s.getName().contains("wait"))
+ .findFirst()
+ .orElseThrow();
+ assertFalse(waitContinuation.getLinks().isEmpty());
+ }
}
diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java
index 3b1509382..5c9cc4b97 100644
--- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java
+++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java
@@ -15,6 +15,7 @@
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.model.ExecutionStatus;
+import software.amazon.lambda.durable.model.WaitForConditionResult;
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;
import software.amazon.lambda.durable.retry.RetryStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
@@ -331,4 +332,185 @@ private static void assertSpanExists(List