Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.otel;

import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import java.util.List;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;

/**
* OTel + X-Ray example: map operation that processes items concurrently.
*
* <p>Exercises map operation tracing — verifies spans for map operation + each item step.
*/
public class OtelXRayMapExample extends DurableHandler<GreetingRequest, String> {

@Override
protected DurableConfig createConfiguration() {
var otlpExporter = OtlpGrpcSpanExporter.getDefault();
var otelPlugin = new OpenTelemetryDurablePlugin(
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
return DurableConfig.builder().withPlugins(otelPlugin).build();
}

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
context.getLogger().info("Starting OTel X-Ray map example for {}", input.getName());

var items = List.of("alpha", "beta", "gamma");
var result = context.map(
"process-items",
items,
String.class,
(item, index, childCtx) ->
childCtx.step("transform-" + item, String.class, stepCtx -> item.toUpperCase()));

return "Mapped " + result.succeeded().size() + " items";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.otel;

import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;

/**
* OTel + X-Ray example: nested child contexts with inner steps.
*
* <p>Exercises nested context tracing — verifies span hierarchy for outer → inner → step.
*/
public class OtelXRayNestedContextExample extends DurableHandler<GreetingRequest, String> {

@Override
protected DurableConfig createConfiguration() {
var otlpExporter = OtlpGrpcSpanExporter.getDefault();
var otelPlugin = new OpenTelemetryDurablePlugin(
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
return DurableConfig.builder().withPlugins(otelPlugin).build();
}

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
context.getLogger().info("Starting OTel X-Ray nested context example for {}", input.getName());

return context.runInChildContext("outer", String.class, outerCtx -> {
var intermediate = outerCtx.step("outer-step", String.class, stepCtx -> "Hello, " + input.getName());
return outerCtx.runInChildContext("inner", String.class, innerCtx -> {
return innerCtx.step("deep-step", String.class, stepCtx -> intermediate.toUpperCase() + "!");
});
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.otel;

import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;

/**
* OTel + X-Ray example: parallel operation with multiple branches.
*
* <p>Exercises parallel operation tracing — verifies spans for parallel + branch steps.
*/
public class OtelXRayParallelExample extends DurableHandler<GreetingRequest, String> {

@Override
protected DurableConfig createConfiguration() {
var otlpExporter = OtlpGrpcSpanExporter.getDefault();
var otelPlugin = new OpenTelemetryDurablePlugin(
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
return DurableConfig.builder().withPlugins(otelPlugin).build();
}

@Override
public String handleRequest(GreetingRequest input, DurableContext context) {
context.getLogger().info("Starting OTel X-Ray parallel example for {}", input.getName());

var parallel = context.parallel("fan-out");
try (parallel) {
parallel.branch(
"branch-a",
String.class,
childCtx -> childCtx.step("step-a", String.class, stepCtx -> "A: " + input.getName()));
parallel.branch(
"branch-b",
String.class,
childCtx -> childCtx.step("step-b", String.class, stepCtx -> "B: " + input.getName()));
}
var result = parallel.get();

return "Parallel completed: " + result.succeeded() + " branches";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.otel;

import static org.junit.jupiter.api.Assertions.*;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

/**
* Cloud-like tests for OTel examples using LocalDurableTestRunner.
*
* <p>These verify that the OTel plugin doesn't break execution for various scenarios. When deployed to Lambda with
* CloudDurableTestRunner, these same scenarios validate that flush/export works correctly under real concurrency.
*/
class OtelXRayCloudTest {

@Test
void mapExample_executesSuccessfully() {
var handler = new OtelXRayMapExample();
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);

var result = runner.runUntilComplete(new GreetingRequest("test"));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("Mapped 3 items", result.getResult(String.class));
}

@Test
void parallelExample_executesSuccessfully() {
var handler = new OtelXRayParallelExample();
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);

var result = runner.runUntilComplete(new GreetingRequest("test"));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertTrue(result.getResult(String.class).contains("Parallel completed: 2 branches"));
}

@Test
void nestedContextExample_executesSuccessfully() {
var handler = new OtelXRayNestedContextExample();
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);

var result = runner.runUntilComplete(new GreetingRequest("World"));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("HELLO, WORLD!", result.getResult(String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.plugin.DurableExecutionPlugin;
import software.amazon.lambda.durable.plugin.InvocationEndInfo;
import software.amazon.lambda.durable.plugin.InvocationInfo;
Expand Down Expand Up @@ -258,16 +259,45 @@ public void onOperationStart(OperationInfo info) {
public void onOperationEnd(OperationEndInfo info) {
if (info.id() == null) return;

// End the operation span that was started in onOperationStart
var span = operationSpans.remove(info.id());
if (span == null) return;

if (info.error() != null) {
span.setStatus(StatusCode.ERROR, info.error().getMessage());
span.recordException(info.error());
}
if (span != null) {
// Operation was started in this invocation — end normally
if (info.error() != null) {
span.setStatus(StatusCode.ERROR, info.error().getMessage());
span.recordException(info.error());
}
span.end();
} else {
// Operation was started in a prior invocation — create a continuation span with Link
// to the deterministic span ID from the original invocation.
var deterministicSpanId = idGenerator.generateSpanIdForOperation(info.id());
var traceId = idGenerator.generateTraceId();
var linkedSpanContext =
SpanContext.create(traceId, deterministicSpanId, TraceFlags.getSampled(), TraceState.getDefault());

var parentContext = resolveParentContext(info.parentId());

var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name()))
.setParent(parentContext)
.addLink(linkedSpanContext)
.setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn)
.setAttribute(DURABLE_OPERATION_ID, info.id())
.setAttribute(DURABLE_OPERATION_TYPE, info.type());

span.end();
if (info.name() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
}

var continuationSpan = spanBuilder.startSpan();

if (info.error() != null) {
continuationSpan.setStatus(StatusCode.ERROR, info.error().getMessage());
continuationSpan.recordException(info.error());
}

continuationSpan.end();
}
}

// ─── User function hooks ─────────────────────────────────────────────
Expand Down Expand Up @@ -327,8 +357,12 @@ public void onUserFunctionEnd(UserFunctionEndInfo info) {
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, outcome);

if (!info.succeeded() && info.error() != null) {
span.setStatus(StatusCode.ERROR, info.error().getMessage());
span.recordException(info.error());
if (info.error() instanceof SuspendExecutionException) {
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, "pending");
} else {
span.setStatus(StatusCode.ERROR, info.error().getMessage());
span.recordException(info.error());
}
}

if (info.endTimestamp() != null) {
Expand Down
Loading
Loading