From eac52b69456578898f505f87926e2e363b3835c3 Mon Sep 17 00:00:00 2001 From: Ayushi Ahjolia Date: Fri, 19 Jun 2026 16:28:10 -0700 Subject: [PATCH] fix(otel): Fix span emission for cross-invocation operations and replay --- .../examples/otel/OtelXRayMapExample.java | 44 ++++ .../otel/OtelXRayNestedContextExample.java | 40 ++++ .../otel/OtelXRayParallelExample.java | 48 +++++ .../examples/otel/OtelXRayCloudTest.java | 52 +++++ .../otel/OpenTelemetryDurablePlugin.java | 52 ++++- .../otel/OpenTelemetryDurablePluginTest.java | 192 ++++++++++++++++++ .../durable/OtelPluginIntegrationTest.java | 182 +++++++++++++++++ .../operation/BaseDurableOperation.java | 28 ++- .../operation/ChildContextOperation.java | 6 + 9 files changed, 624 insertions(+), 20 deletions(-) create mode 100644 examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java create mode 100644 examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayNestedContextExample.java create mode 100644 examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayParallelExample.java create mode 100644 examples/src/test/java/software/amazon/lambda/durable/examples/otel/OtelXRayCloudTest.java diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java new file mode 100644 index 000000000..e09e16d11 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayMapExample.java @@ -0,0 +1,44 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.otel; + +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; + +/** + * OTel + X-Ray example: map operation that processes items concurrently. + * + *

Exercises map operation tracing — verifies spans for map operation + each item step. + */ +public class OtelXRayMapExample extends DurableHandler { + + @Override + protected DurableConfig createConfiguration() { + var otlpExporter = OtlpGrpcSpanExporter.getDefault(); + var otelPlugin = new OpenTelemetryDurablePlugin( + SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter))); + return DurableConfig.builder().withPlugins(otelPlugin).build(); + } + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + context.getLogger().info("Starting OTel X-Ray map example for {}", input.getName()); + + var items = List.of("alpha", "beta", "gamma"); + var result = context.map( + "process-items", + items, + String.class, + (item, index, childCtx) -> + childCtx.step("transform-" + item, String.class, stepCtx -> item.toUpperCase())); + + return "Mapped " + result.succeeded().size() + " items"; + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayNestedContextExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayNestedContextExample.java new file mode 100644 index 000000000..7f385c6fc --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayNestedContextExample.java @@ -0,0 +1,40 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.otel; + +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; + +/** + * OTel + X-Ray example: nested child contexts with inner steps. + * + *

Exercises nested context tracing — verifies span hierarchy for outer → inner → step. + */ +public class OtelXRayNestedContextExample extends DurableHandler { + + @Override + protected DurableConfig createConfiguration() { + var otlpExporter = OtlpGrpcSpanExporter.getDefault(); + var otelPlugin = new OpenTelemetryDurablePlugin( + SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter))); + return DurableConfig.builder().withPlugins(otelPlugin).build(); + } + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + context.getLogger().info("Starting OTel X-Ray nested context example for {}", input.getName()); + + return context.runInChildContext("outer", String.class, outerCtx -> { + var intermediate = outerCtx.step("outer-step", String.class, stepCtx -> "Hello, " + input.getName()); + return outerCtx.runInChildContext("inner", String.class, innerCtx -> { + return innerCtx.step("deep-step", String.class, stepCtx -> intermediate.toUpperCase() + "!"); + }); + }); + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayParallelExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayParallelExample.java new file mode 100644 index 000000000..45f10e871 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/otel/OtelXRayParallelExample.java @@ -0,0 +1,48 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.otel; + +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; + +/** + * OTel + X-Ray example: parallel operation with multiple branches. + * + *

Exercises parallel operation tracing — verifies spans for parallel + branch steps. + */ +public class OtelXRayParallelExample extends DurableHandler { + + @Override + protected DurableConfig createConfiguration() { + var otlpExporter = OtlpGrpcSpanExporter.getDefault(); + var otelPlugin = new OpenTelemetryDurablePlugin( + SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter))); + return DurableConfig.builder().withPlugins(otelPlugin).build(); + } + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + context.getLogger().info("Starting OTel X-Ray parallel example for {}", input.getName()); + + var parallel = context.parallel("fan-out"); + try (parallel) { + parallel.branch( + "branch-a", + String.class, + childCtx -> childCtx.step("step-a", String.class, stepCtx -> "A: " + input.getName())); + parallel.branch( + "branch-b", + String.class, + childCtx -> childCtx.step("step-b", String.class, stepCtx -> "B: " + input.getName())); + } + var result = parallel.get(); + + return "Parallel completed: " + result.succeeded() + " branches"; + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/otel/OtelXRayCloudTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/otel/OtelXRayCloudTest.java new file mode 100644 index 000000000..41833b663 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/otel/OtelXRayCloudTest.java @@ -0,0 +1,52 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.otel; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +/** + * Cloud-like tests for OTel examples using LocalDurableTestRunner. + * + *

These verify that the OTel plugin doesn't break execution for various scenarios. When deployed to Lambda with + * CloudDurableTestRunner, these same scenarios validate that flush/export works correctly under real concurrency. + */ +class OtelXRayCloudTest { + + @Test + void mapExample_executesSuccessfully() { + var handler = new OtelXRayMapExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("test")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("Mapped 3 items", result.getResult(String.class)); + } + + @Test + void parallelExample_executesSuccessfully() { + var handler = new OtelXRayParallelExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("test")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertTrue(result.getResult(String.class).contains("Parallel completed: 2 branches")); + } + + @Test + void nestedContextExample_executesSuccessfully() { + var handler = new OtelXRayNestedContextExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.runUntilComplete(new GreetingRequest("World")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, WORLD!", result.getResult(String.class)); + } +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java index af7e10562..3e71e82ef 100644 --- a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.plugin.DurableExecutionPlugin; import software.amazon.lambda.durable.plugin.InvocationEndInfo; import software.amazon.lambda.durable.plugin.InvocationInfo; @@ -258,16 +259,45 @@ public void onOperationStart(OperationInfo info) { public void onOperationEnd(OperationEndInfo info) { if (info.id() == null) return; - // End the operation span that was started in onOperationStart var span = operationSpans.remove(info.id()); - if (span == null) return; - if (info.error() != null) { - span.setStatus(StatusCode.ERROR, info.error().getMessage()); - span.recordException(info.error()); - } + if (span != null) { + // Operation was started in this invocation — end normally + if (info.error() != null) { + span.setStatus(StatusCode.ERROR, info.error().getMessage()); + span.recordException(info.error()); + } + span.end(); + } else { + // Operation was started in a prior invocation — create a continuation span with Link + // to the deterministic span ID from the original invocation. + var deterministicSpanId = idGenerator.generateSpanIdForOperation(info.id()); + var traceId = idGenerator.generateTraceId(); + var linkedSpanContext = + SpanContext.create(traceId, deterministicSpanId, TraceFlags.getSampled(), TraceState.getDefault()); + + var parentContext = resolveParentContext(info.parentId()); + + var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name())) + .setParent(parentContext) + .addLink(linkedSpanContext) + .setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn) + .setAttribute(DURABLE_OPERATION_ID, info.id()) + .setAttribute(DURABLE_OPERATION_TYPE, info.type()); - span.end(); + if (info.name() != null) { + spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name()); + } + + var continuationSpan = spanBuilder.startSpan(); + + if (info.error() != null) { + continuationSpan.setStatus(StatusCode.ERROR, info.error().getMessage()); + continuationSpan.recordException(info.error()); + } + + continuationSpan.end(); + } } // ─── User function hooks ───────────────────────────────────────────── @@ -327,8 +357,12 @@ public void onUserFunctionEnd(UserFunctionEndInfo info) { span.setAttribute(DURABLE_ATTEMPT_OUTCOME, outcome); if (!info.succeeded() && info.error() != null) { - span.setStatus(StatusCode.ERROR, info.error().getMessage()); - span.recordException(info.error()); + if (info.error() instanceof SuspendExecutionException) { + span.setAttribute(DURABLE_ATTEMPT_OUTCOME, "pending"); + } else { + span.setStatus(StatusCode.ERROR, info.error().getMessage()); + span.recordException(info.error()); + } } if (info.endTimestamp() != null) { diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java index b3a2fe4bb..ca143c570 100644 --- a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java +++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java @@ -11,6 +11,7 @@ import java.time.Instant; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.plugin.*; class OpenTelemetryDurablePluginTest { @@ -417,4 +418,195 @@ void xrayExtraction_extractedTraceIdMatchesXrayConversion() { var spans = spanExporter.getFinishedSpanItems(); assertEquals(expectedOtelTraceId, spans.get(0).getTraceId()); } + + // ─── Cross-invocation continuation span tests ──────────────────────── + + @Test + void operationEnd_withoutMatchingStart_createsContinuationSpanWithLink() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + // onOperationEnd without a prior onOperationStart — operation completed between invocations + plugin.onOperationEnd( + new OperationEndInfo("op-wait-1", "my-wait", "WAIT", "Wait", null, Instant.now(), Instant.now(), null)); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null)); + + var spans = spanExporter.getFinishedSpanItems(); + assertEquals(2, spans.size()); + + var continuationSpan = spans.stream() + .filter(s -> s.getName().contains("wait")) + .findFirst() + .orElseThrow(); + assertEquals("durable.wait:my-wait", continuationSpan.getName()); + assertFalse(continuationSpan.getLinks().isEmpty(), "Continuation span should have a Link"); + } + + @Test + void operationEnd_withoutMatchingStart_withError_setsErrorStatus() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + plugin.onOperationEnd(new OperationEndInfo( + "op-cb-1", + "my-callback", + "CALLBACK", + "Callback", + null, + Instant.now(), + Instant.now(), + new RuntimeException("timed out"))); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null)); + + var continuationSpan = spanExporter.getFinishedSpanItems().stream() + .filter(s -> s.getName().contains("callback")) + .findFirst() + .orElseThrow(); + assertEquals(StatusCode.ERROR, continuationSpan.getStatus().getStatusCode()); + assertFalse(continuationSpan.getLinks().isEmpty()); + } + + // ─── SuspendExecutionException handling ────────────────────────────── + + @Test + void userFunctionEnd_withSuspendException_setsOutcomePending() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + plugin.onUserFunctionStart(new UserFunctionStartInfo( + "op-1", "child-ctx", "CONTEXT", "RunInChildContext", null, Instant.now(), false, null)); + + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", + "child-ctx", + "CONTEXT", + "RunInChildContext", + null, + Instant.now(), + Instant.now(), + false, + null, + false, + new SuspendExecutionException())); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.PENDING, null)); + + var attemptSpan = spanExporter.getFinishedSpanItems().stream() + .filter(s -> s.getName().contains("child-ctx")) + .findFirst() + .orElseThrow(); + // Should NOT have ERROR status — suspension is not an error + assertNotEquals(StatusCode.ERROR, attemptSpan.getStatus().getStatusCode()); + } + + // ─── Attempt span cleanup at invocation end ────────────────────────── + + @Test + void attemptSpan_endedAtInvocationEnd_whenUserFunctionEndNotCalled() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + // Start attempt but never call onUserFunctionEnd (simulates crash before end hook) + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "running", "STEP", "Step", null, Instant.now(), false, 1)); + + // Invocation ends — attempt span should be cleaned up + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.PENDING, null)); + + var spans = spanExporter.getFinishedSpanItems(); + var attemptSpan = spans.stream() + .filter(s -> s.getName().contains("running")) + .findFirst() + .orElseThrow(); + assertNotNull(attemptSpan, "Attempt span should be exported even without onUserFunctionEnd"); + } + + // ─── Parent resolution with parentId ───────────────────────────────── + + @Test + void childOperation_parentedToParentOperationSpan() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + // Parent context operation + plugin.onOperationStart(new OperationInfo( + "op-parent", "my-context", "CONTEXT", "RunInChildContext", null, Instant.now(), null)); + + // Child operation with parentId pointing to parent + plugin.onOperationStart( + new OperationInfo("op-child", "inner-step", "STEP", "Step", "op-parent", Instant.now(), null)); + plugin.onOperationEnd(new OperationEndInfo( + "op-child", "inner-step", "STEP", "Step", "op-parent", Instant.now(), Instant.now(), null)); + + plugin.onOperationEnd(new OperationEndInfo( + "op-parent", "my-context", "CONTEXT", "RunInChildContext", null, Instant.now(), Instant.now(), null)); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null)); + + var spans = spanExporter.getFinishedSpanItems(); + + var parentSpan = spans.stream() + .filter(s -> s.getName().contains("context")) + .findFirst() + .orElseThrow(); + var childSpan = spans.stream() + .filter(s -> s.getName().contains("inner-step")) + .findFirst() + .orElseThrow(); + + assertEquals( + parentSpan.getSpanId(), + childSpan.getParentSpanId(), + "Child operation should be parented to parent operation span"); + } + + // ─── Multi-invocation step-wait-step scenario ──────────────────────── + + @Test + void multiInvocation_stepWaitStep_producesCorrectSpans() { + var arn = "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1"; + + // Invocation 1: step completes, wait starts + plugin.onInvocationStart(new InvocationInfo("req-1", arn, true)); + plugin.onOperationStart(new OperationInfo("op-1", "step-A", "STEP", "Step", null, Instant.now(), null)); + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "step-A", "STEP", "Step", null, Instant.now(), false, 1)); + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", "step-A", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + plugin.onOperationEnd( + new OperationEndInfo("op-1", "step-A", "STEP", "Step", null, Instant.now(), Instant.now(), null)); + plugin.onOperationStart(new OperationInfo("op-2", "pause", "WAIT", "Wait", null, Instant.now(), null)); + plugin.onInvocationEnd(new InvocationEndInfo("req-1", arn, true, InvocationStatus.PENDING, null)); + + // Invocation 1 should have: step op + step attempt + wait (PENDING) + invocation = 4 + assertEquals(4, spanExporter.getFinishedSpanItems().size()); + var inv1TraceId = spanExporter.getFinishedSpanItems().get(0).getTraceId(); + + spanExporter.reset(); + + // Invocation 2: wait completed between invocations, new step runs + plugin.onInvocationStart(new InvocationInfo("req-2", arn, false)); + plugin.onOperationEnd( + new OperationEndInfo("op-2", "pause", "WAIT", "Wait", null, Instant.now(), Instant.now(), null)); + plugin.onOperationStart(new OperationInfo("op-3", "step-B", "STEP", "Step", null, Instant.now(), null)); + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-3", "step-B", "STEP", "Step", null, Instant.now(), false, 1)); + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-3", "step-B", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + plugin.onOperationEnd( + new OperationEndInfo("op-3", "step-B", "STEP", "Step", null, Instant.now(), Instant.now(), null)); + plugin.onInvocationEnd(new InvocationEndInfo("req-2", arn, false, InvocationStatus.SUCCEEDED, null)); + + var inv2Spans = spanExporter.getFinishedSpanItems(); + // wait continuation + step-B op + step-B attempt + invocation = 4 + assertEquals(4, inv2Spans.size()); + + // Same trace ID across invocations + var inv2TraceId = inv2Spans.get(0).getTraceId(); + assertEquals(inv1TraceId, inv2TraceId); + + // Wait continuation should have a Link + var waitContinuation = inv2Spans.stream() + .filter(s -> s.getName().contains("wait")) + .findFirst() + .orElseThrow(); + assertFalse(waitContinuation.getLinks().isEmpty()); + } } diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java index 3b1509382..5c9cc4b97 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.Test; import software.amazon.lambda.durable.config.StepConfig; import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.model.WaitForConditionResult; import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; import software.amazon.lambda.durable.retry.RetryStrategies; import software.amazon.lambda.durable.testing.LocalDurableTestRunner; @@ -331,4 +332,185 @@ private static void assertSpanExists(List spans, String expectedName) "Expected span '" + expectedName + "' not found. Got: " + spans.stream().map(SpanData::getName).toList()); } + + // ─── Additional scenario tests ────────────────────────────────────── + + @Test + void callback_producesSpansInFirstInvocation() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + var result = ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {}); + return result; + }, + otelConfig); + + // First invocation: callback starts, suspends + var result1 = runner.run("input"); + assertEquals(ExecutionStatus.PENDING, result1.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + assertTrue(spans.size() >= 2, "Should have invocation + callback spans"); + + // Verify callback operation span exists + assertTrue( + spans.stream().anyMatch(s -> s.getName().contains("approval")), + "Should have callback span. Got: " + + spans.stream().map(SpanData::getName).toList()); + } + + @Test + void mapOperation_producesSpansForEachItem() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + ctx.map( + "batch", + List.of("a", "b", "c"), + String.class, + (item, index, childCtx) -> + childCtx.step("process-" + item, String.class, stepCtx -> item.toUpperCase())); + return "done"; + }, + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + // Should have: invocation + map operation + map attempt + item steps + item attempts + assertTrue(spans.size() >= 9, "Expected at least 9 spans for map, got " + spans.size()); + + assertTrue( + spans.stream().anyMatch(s -> s.getName().contains("batch")), + "Should have map operation span. Got: " + + spans.stream().map(SpanData::getName).toList()); + } + + @Test + void parallelOperation_producesSpansForEachBranch() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + var parallel = ctx.parallel("fan-out"); + try (parallel) { + parallel.branch( + "branch-A", + String.class, + childCtx -> childCtx.step("step-A", String.class, stepCtx -> "A")); + parallel.branch( + "branch-B", + String.class, + childCtx -> childCtx.step("step-B", String.class, stepCtx -> "B")); + } + parallel.get(); + return "done"; + }, + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + assertTrue(spans.size() >= 7, "Expected at least 7 spans for parallel, got " + spans.size()); + + assertTrue( + spans.stream().anyMatch(s -> s.getName().contains("fan-out")), + "Should have parallel operation span. Got: " + + spans.stream().map(SpanData::getName).toList()); + assertSpanExists(spans, "durable.step:step-A"); + assertSpanExists(spans, "durable.step:step-B"); + } + + @Test + void nestedChildContext_producesCorrectHierarchy() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.runInChildContext("outer", String.class, outerCtx -> { + return outerCtx.runInChildContext("inner", String.class, innerCtx -> { + return innerCtx.step("deep-step", String.class, stepCtx -> "deep"); + }); + }), + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + assertSpanExists(spans, "durable.runinchildcontext:outer"); + assertSpanExists(spans, "durable.runinchildcontext:inner"); + assertSpanExists(spans, "durable.step:deep-step"); + } + + @Test + void multipleWaits_producesSpansAcrossMultipleInvocations() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + ctx.wait("wait-A", Duration.ofMinutes(1)); + ctx.wait("wait-B", Duration.ofMinutes(1)); + return "done"; + }, + otelConfig); + + // Invocation 1: wait-A starts, suspends + var result1 = runner.run("input"); + assertEquals(ExecutionStatus.PENDING, result1.getStatus()); + + runner.advanceTime(); + + // Invocation 2: wait-A completed, wait-B starts, suspends + var result2 = runner.run("input"); + assertEquals(ExecutionStatus.PENDING, result2.getStatus()); + + runner.advanceTime(); + + // Invocation 3: wait-B completed, execution succeeds + var result3 = runner.run("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result3.getStatus()); + + var allSpans = spanExporter.getFinishedSpanItems(); + + // Should have 3 invocation spans + var invocationSpans = allSpans.stream() + .filter(s -> s.getName().equals("durable.invocation")) + .toList(); + assertEquals(3, invocationSpans.size(), "Should have 3 invocation spans"); + + // Should have wait-A and wait-B spans + assertTrue( + allSpans.stream().anyMatch(s -> s.getName().equals("durable.wait:wait-A")), "Should have wait-A span"); + assertTrue( + allSpans.stream().anyMatch(s -> s.getName().equals("durable.wait:wait-B")), "Should have wait-B span"); + } + + @Test + void waitForCondition_producesSpansWithAttempts() { + var pollCount = new AtomicInteger(0); + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + ctx.waitForCondition("check-ready", String.class, (state, stepCtx) -> { + if (pollCount.incrementAndGet() >= 3) { + return WaitForConditionResult.stopPolling("ready"); + } + return WaitForConditionResult.continuePolling("not-yet"); + }); + return "done"; + }, + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + assertTrue( + spans.stream().anyMatch(s -> s.getName().contains("check-ready")), + "Should have waitForCondition span. Got: " + + spans.stream().map(SpanData::getName).toList()); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index a4b26d8a5..4511d2d8a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -127,7 +127,9 @@ public OperationType getType() { */ public void execute() { if (isVirtual) { - // We never persist virtual operations, so always call start + // Virtual operations are not checkpointed, but we still fire plugin hooks + // so the OTel plugin can emit spans for map/parallel iterations. + fireOnOperationStart(null); start(); } else { var existing = getOperation(); @@ -136,12 +138,17 @@ public void execute() { validateReplay(existing); if (ExecutionManager.isTerminalStatus(existing.status())) { replayCompletedOperation.set(true); + } else if (getType() == OperationType.STEP || getType() == OperationType.CONTEXT) { + // Non-terminal STEP/CONTEXT operations are being re-executed (user code runs again). + // Fire onOperationStart so the OTel plugin can create a parent span for attempt spans. + fireOnOperationStart(existing); } - // Fire onOperationStart plugin hook (including replay) - fireOnOperationStart(existing); + // WAIT/INVOKE/CALLBACK in non-terminal status just poll — no onOperationStart needed. + // They'll get a continuation span via onOperationEnd when they complete. // Fire onOperationEnd for operations that completed during suspension (between invocations). - // This enables the OTel plugin to emit spans for operations that transitioned while Lambda was frozen. + // The OTel plugin handles the missing onOperationStart by creating a continuation span linked + // to the deterministic span ID from the original invocation. if (replayCompletedOperation.get() && executionManager.isOperationUpdatedSinceLastInvocation(getOperationId())) { fireOnOperationEnd(existing, extractErrorFromOperation(existing)); @@ -290,12 +297,11 @@ protected void runUserHandler(Runnable runnable, ThreadType threadType, Integer pluginRunner.onUserFunctionEnd( PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, true, null)); } catch (Throwable throwable) { - // Fire onUserFunctionEnd for actual user function failures, - // not for SDK control flow signals (SuspendExecutionException) - if (!(throwable instanceof SuspendExecutionException)) { - pluginRunner.onUserFunctionEnd( - PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, false, throwable)); - } + // Fire onUserFunctionEnd for all outcomes including suspension. + // For SuspendExecutionException, this allows the OTel plugin to mark the attempt span as PENDING + // rather than leaving it with no status at invocation cleanup. + pluginRunner.onUserFunctionEnd( + PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, false, throwable)); // Operations always wrap the user's function and handles all possible exceptions except for // SuspendExecutionException. @@ -520,7 +526,7 @@ private void fireOnOperationStart(Operation existing) { } /** Fires onOperationEnd plugin hook when an operation reaches terminal status for the first time. */ - private void fireOnOperationEnd(Operation operation, Throwable error) { + protected void fireOnOperationEnd(Operation operation, Throwable error) { var info = PluginInfoConverter.toOperationEndInfo( operation, operationIdentifier, durableContext.getParentId(), error); getPluginRunner().onOperationEnd(info); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index acdda283b..a47aff182 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -157,6 +157,9 @@ private void handleChildContextSuccess(T result) { // - nestingType is FLAT // Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response. cachedOperationResult.set(DeserializedOperationResult.succeeded(result)); + if (isVirtual) { + fireOnOperationEnd(null, null); + } markAlreadyCompleted(); } else { checkpointSuccess(result); @@ -207,6 +210,9 @@ private void handleChildContextFailure(Throwable exception) { // the parent has already succeeded. // - this child is not a direct child of a parent context (i.e. nestingType == FLAT), such as a parallel branch. if ((parentOperation != null && parentOperation.isOperationCompleted()) || isVirtual) { + if (isVirtual) { + fireOnOperationEnd(null, exception); + } markAlreadyCompleted(); return; }