Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def _initialize_instance(
span_exporter=span_exporter,
media_manager=self._media_manager,
mask_otel_spans=mask_otel_spans,
environment=environment,
)
tracer_provider.add_span_processor(langfuse_processor)

Expand Down
34 changes: 32 additions & 2 deletions langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import base64
import os
import threading
from typing import Callable, Dict, List, Optional, cast
from typing import Any, Callable, Dict, List, Optional, cast

from opentelemetry import context as context_api
from opentelemetry.context import Context
Expand All @@ -35,7 +35,7 @@
)
from langfuse._client.span_exporter import LangfuseTransformingSpanExporter
from langfuse._client.span_filter import is_default_export_span, is_langfuse_span
from langfuse._client.utils import span_formatter
from langfuse._client.utils import get_string_span_attribute, span_formatter
from langfuse._task_manager.media_manager import MediaManager
from langfuse._version import __version__ as langfuse_version
from langfuse.logger import langfuse_logger
Expand Down Expand Up @@ -74,8 +74,10 @@
span_exporter: Optional[SpanExporter] = None,
media_manager: Optional[MediaManager] = None,
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
environment: Optional[str] = None,
):
self.public_key = public_key
self._environment = environment
self.blocked_instrumentation_scopes = (
blocked_instrumentation_scopes
if blocked_instrumentation_scopes is not None
Expand Down Expand Up @@ -143,6 +145,7 @@
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
context = parent_context or context_api.get_current()
propagated_attributes = _get_propagated_attributes_from_context(context)
self._apply_default_environment(span=span, attributes=propagated_attributes)

if propagated_attributes:
span.set_attributes(propagated_attributes)
Expand All @@ -162,6 +165,33 @@

return super().on_start(span, parent_context)

def _apply_default_environment(
self, *, span: Span, attributes: Dict[str, Any]
) -> None:
"""Apply the processor environment to spans without an explicit environment.

Langfuse-created wrapper spans set ``langfuse.environment`` themselves, and
``propagate_attributes(environment=...)`` adds it to the active context for
request-scoped overrides. Third-party OpenTelemetry spans only pass through
this processor, so they need the client-level environment applied here when
neither of those more specific sources is present.
"""

if LangfuseOtelSpanAttributes.ENVIRONMENT in attributes:
return

environment = getattr(self, "_environment", None)
if environment is None:
return

if (
get_string_span_attribute(span, LangfuseOtelSpanAttributes.ENVIRONMENT)
is not None
):
return

Check failure on line 192 in langfuse/_client/span_processor.py

View check run for this annotation

Claude / Claude Code Review

Cross-client environment leakage in multi-Langfuse-client setups

In multi-Langfuse-client setups (two `Langfuse(...)` instances with different `public_key`s and different `environment`s on the same TracerProvider), the new `_apply_default_environment` in the first-registered processor's `on_start` writes its own `langfuse.environment` onto every span before the second processor runs, then the second processor's `get_string_span_attribute` short-circuit bails out. The wrapper at `langfuse/_client/span.py:134-143` then reads that leaked value via `existing_envi
Comment on lines +168 to +192

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 In multi-Langfuse-client setups (two Langfuse(...) instances with different public_keys and different environments on the same TracerProvider), the new _apply_default_environment in the first-registered processor's on_start writes its own langfuse.environment onto every span before the second processor runs, then the second processor's get_string_span_attribute short-circuit bails out. The wrapper at langfuse/_client/span.py:134-143 then reads that leaked value via existing_environment or environment or client._environment and re-stamps it, so client_b's exporter ships spans labeled with client_a's environment. Fix: gate _apply_default_environment to skip when is_langfuse_span(span) but not _is_langfuse_project_span(span) — mirroring the isolation already done in on_end and _is_expected_exported_at_start.

Extended reasoning...

What the bug is

This PR introduces _apply_default_environment in LangfuseSpanProcessor.on_start to stamp the client-level langfuse.environment onto third-party OTel spans that do not already carry one. The implementation has no scope check: it runs on every span the TracerProvider sees, including spans created by other Langfuse clients sharing the same global TracerProvider. In multi-client setups this leaks one tenant's environment onto another tenant's spans before any project-scoping filter has a chance to run.

Why multi-client setups share a TracerProvider

LangfuseResourceManager._instances is keyed by public_key, so two Langfuse(public_key=...) calls produce two distinct resource managers. But _init_tracer_provider only constructs a new TracerProvider when otel_trace_api.get_tracer_provider() is a ProxyTracerProvider — the second client falls into the else branch and reuses the first client's provider, then calls tracer_provider.add_span_processor to attach its own LangfuseSpanProcessor. Both processors are now registered on the same provider. This is explicitly supported (see the 'multi-project setups' comment at span_processor.py:198 and TestMultiProjectSetup in tests/unit/test_otel.py).

Step-by-step proof

Setup:

client_a = Langfuse(public_key='pk-A', secret_key='sk-A', environment='env-a')
client_b = Langfuse(public_key='pk-B', secret_key='sk-B', environment='env-b')

with client_b.start_as_current_observation(name='work') as span:
    ...
  1. client_b._otel_tracer.start_as_current_span creates the span. The OTel SDK's SynchronousMultiSpanProcessor.on_start iterates processors in registration order and calls each one synchronously, before start_as_current_span returns.
  2. proc_A (env-a) runs first. _get_propagated_attributes_from_context returns an empty dict (no propagate_attributes active). _apply_default_environment checks: (a) ENVIRONMENT not in attributes ✓, (b) self._environment = 'env-a' is not None ✓, (c) get_string_span_attribute(span, ENVIRONMENT) returns None because nothing has written to the span yet ✓ — so it sets attributes[ENVIRONMENT] = 'env-a'. Then on_start runs if propagated_attributes: span.set_attributes(propagated_attributes), writing langfuse.environment='env-a' onto the span synchronously.
  3. proc_B (env-b) runs second on the same span. Its own attributes dict is still empty, but get_string_span_attribute(span, ENVIRONMENT) now returns 'env-a' (just written by proc_A), so _apply_default_environment early-returns. env-b is never applied.
  4. start_as_current_span returns. The wrapper LangfuseObservationWrapper.__init__ at langfuse/_client/span.py:134-143 reads existing_environment = get_string_span_attribute(self._otel_span, ENVIRONMENT)'env-a'. It then short-circuits self._environment = existing_environment or environment or self._langfuse_client._environment to 'env-a' and re-stamps that onto the span.
  5. on_end: _is_langfuse_project_span correctly rejects on proc_A (instrumentation-scope public_key='pk-B''pk-A'), but proc_B accepts and exports the span to client B's backend — with langfuse.environment='env-a', the other tenant's value.

Why existing safeguards don't catch this

_is_langfuse_project_span is only consulted in on_end and _is_expected_exported_at_start. The contamination happens at on_start before any project filtering runs. The Resource-level langfuse.environment set by _init_tracer_provider is also locked to the first-initialized client (only the first call creates a new TracerProvider with a Resource), so the per-span write was the only mechanism left to carry client B's environment — and the PR breaks that mechanism.

Pre-PR behavior was correct

Before this PR, nothing wrote langfuse.environment during on_start. existing_environment in the wrapper was None, so the short-circuit resolved to client._environment = 'env-b', correctly. This is a regression introduced by this PR specifically.

Worse for third-party spans

For non-Langfuse third-party spans (e.g. opentelemetry.instrumentation.openai), is_langfuse_span(span) returns False, so the on_end project filter is skipped entirely and both processors export the span. Both exported copies carry langfuse.environment='env-a' (whichever processor was registered first wins). This contradicts the PR's stated intent of applying each client's environment to its own third-party spans.

How to fix

Mirror the isolation already done in on_end and _is_expected_exported_at_start: skip _apply_default_environment when is_langfuse_span(span) but not _is_langfuse_project_span(span). This leaves Langfuse-created wrapper spans for their owning processor (the wrapper already applies the env correctly when nothing else has) and leaves third-party spans to whichever processor genuinely owns the surrounding trace context. Equivalently, narrow the apply to non-Langfuse spans only (which matches the docstring's stated intent — "Third-party OpenTelemetry spans only pass through this processor").

attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = environment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid applying one client's environment to other projects

When two Langfuse clients with different public keys/environments share the default TracerProvider, every registered LangfuseSpanProcessor receives on_start for every span. The first processor now writes its own environment before the Langfuse wrapper/project filter runs, and the owning processor then preserves that already-present langfuse.environment, so spans created by the second project can be exported with the first project's environment. This needs to be gated to spans owned by this processor/project or deferred until export filtering can determine ownership.

Useful? React with 👍 / 👎.


def on_end(self, span: ReadableSpan) -> None:
try:
# Only export spans that belong to the scoped project
Expand Down
94 changes: 93 additions & 1 deletion tests/unit/test_span_processor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Sequence

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

from langfuse import propagate_attributes
from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.environment_variables import (
LANGFUSE_FLUSH_AT,
LANGFUSE_FLUSH_INTERVAL,
Expand All @@ -18,6 +21,68 @@ def shutdown(self) -> None:
pass


class InMemorySpanExporter(SpanExporter):
def __init__(self) -> None:
self.spans: list[ReadableSpan] = []

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
self.spans.extend(spans)

return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass


def _export_third_party_span(
*,
processor_environment: str,
span_attributes: dict[str, str] | None = None,
resource_attributes: dict[str, str] | None = None,
propagated_environment: str | None = None,
) -> ReadableSpan:
exporter = InMemorySpanExporter()
provider = TracerProvider(
resource=Resource.create(
{"service.name": "test", **(resource_attributes or {})}
)
)
processor = LangfuseSpanProcessor(
public_key="pk-test",
secret_key="sk-test",
base_url="http://localhost:3000",
flush_at=10,
flush_interval=1,
span_exporter=exporter,
environment=processor_environment,
should_export_span=lambda span: True,
)
provider.add_span_processor(processor)

try:
tracer = provider.get_tracer("third-party.instrumentation", "1.0.0")

if propagated_environment is None:
with tracer.start_as_current_span(
"third-party-span", attributes=span_attributes
):
pass
else:
with propagate_attributes(environment=propagated_environment):
with tracer.start_as_current_span(
"third-party-span", attributes=span_attributes
):
pass

provider.force_flush()

assert len(exporter.spans) == 1

return exporter.spans[0]
finally:
provider.shutdown()


def test_span_processor_uses_constructor_flush_settings_without_env(monkeypatch):
monkeypatch.delenv(LANGFUSE_FLUSH_AT, raising=False)
monkeypatch.delenv(LANGFUSE_FLUSH_INTERVAL, raising=False)
Expand All @@ -37,6 +102,33 @@ def test_span_processor_uses_constructor_flush_settings_without_env(monkeypatch)
processor.shutdown()


def test_span_processor_applies_environment_to_third_party_spans():
span = _export_third_party_span(processor_environment="proxy-prod")

assert span.attributes is not None
assert span.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] == "proxy-prod"


def test_span_processor_prefers_propagated_environment_for_third_party_spans():
span = _export_third_party_span(
processor_environment="proxy-prod",
propagated_environment="staging",
)

assert span.attributes is not None
assert span.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] == "staging"


def test_span_processor_preserves_explicit_third_party_span_environment():
span = _export_third_party_span(
processor_environment="proxy-prod",
span_attributes={LangfuseOtelSpanAttributes.ENVIRONMENT: "span-env"},
)

assert span.attributes is not None
assert span.attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] == "span-env"


def test_span_processor_uses_env_flush_settings_when_constructor_omits_them(
monkeypatch,
):
Expand Down