-
Notifications
You must be signed in to change notification settings - Fork 303
fix(tracing): apply environments to processor spans #1727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |
| import base64 | ||
| import os | ||
| import threading | ||
| from typing import Callable, Dict, List, Optional, cast | ||
| from typing import Any, Callable, Dict, List, Optional, cast | ||
|
|
||
| from opentelemetry import context as context_api | ||
| from opentelemetry.context import Context | ||
|
|
@@ -35,7 +35,7 @@ | |
| ) | ||
| from langfuse._client.span_exporter import LangfuseTransformingSpanExporter | ||
| from langfuse._client.span_filter import is_default_export_span, is_langfuse_span | ||
| from langfuse._client.utils import span_formatter | ||
| from langfuse._client.utils import get_string_span_attribute, span_formatter | ||
| from langfuse._task_manager.media_manager import MediaManager | ||
| from langfuse._version import __version__ as langfuse_version | ||
| from langfuse.logger import langfuse_logger | ||
|
|
@@ -74,8 +74,10 @@ | |
| span_exporter: Optional[SpanExporter] = None, | ||
| media_manager: Optional[MediaManager] = None, | ||
| mask_otel_spans: Optional[MaskOtelSpansFunction] = None, | ||
| environment: Optional[str] = None, | ||
| ): | ||
| self.public_key = public_key | ||
| self._environment = environment | ||
| self.blocked_instrumentation_scopes = ( | ||
| blocked_instrumentation_scopes | ||
| if blocked_instrumentation_scopes is not None | ||
|
|
@@ -143,6 +145,7 @@ | |
| def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: | ||
| context = parent_context or context_api.get_current() | ||
| propagated_attributes = _get_propagated_attributes_from_context(context) | ||
| self._apply_default_environment(span=span, attributes=propagated_attributes) | ||
|
|
||
| if propagated_attributes: | ||
| span.set_attributes(propagated_attributes) | ||
|
|
@@ -162,6 +165,33 @@ | |
|
|
||
| return super().on_start(span, parent_context) | ||
|
|
||
| def _apply_default_environment( | ||
| self, *, span: Span, attributes: Dict[str, Any] | ||
| ) -> None: | ||
| """Apply the processor environment to spans without an explicit environment. | ||
|
|
||
| Langfuse-created wrapper spans set ``langfuse.environment`` themselves, and | ||
| ``propagate_attributes(environment=...)`` adds it to the active context for | ||
| request-scoped overrides. Third-party OpenTelemetry spans only pass through | ||
| this processor, so they need the client-level environment applied here when | ||
| neither of those more specific sources is present. | ||
| """ | ||
|
|
||
| if LangfuseOtelSpanAttributes.ENVIRONMENT in attributes: | ||
| return | ||
|
|
||
| environment = getattr(self, "_environment", None) | ||
| if environment is None: | ||
| return | ||
|
|
||
| if ( | ||
| get_string_span_attribute(span, LangfuseOtelSpanAttributes.ENVIRONMENT) | ||
| is not None | ||
| ): | ||
| return | ||
|
|
||
|
Check failure on line 192 in langfuse/_client/span_processor.py
|
||
| attributes[LangfuseOtelSpanAttributes.ENVIRONMENT] = environment | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When two Langfuse clients with different public keys/environments share the default Useful? React with 👍 / 👎. |
||
|
|
||
| def on_end(self, span: ReadableSpan) -> None: | ||
| try: | ||
| # Only export spans that belong to the scoped project | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 In multi-Langfuse-client setups (two
Langfuse(...)instances with differentpublic_keys and differentenvironments on the same TracerProvider), the new_apply_default_environmentin the first-registered processor'son_startwrites its ownlangfuse.environmentonto every span before the second processor runs, then the second processor'sget_string_span_attributeshort-circuit bails out. The wrapper atlangfuse/_client/span.py:134-143then reads that leaked value viaexisting_environment or environment or client._environmentand re-stamps it, soclient_b's exporter ships spans labeled withclient_a's environment. Fix: gate_apply_default_environmentto skip whenis_langfuse_span(span)but not_is_langfuse_project_span(span)— mirroring the isolation already done inon_endand_is_expected_exported_at_start.Extended reasoning...
What the bug is
This PR introduces
_apply_default_environmentinLangfuseSpanProcessor.on_startto stamp the client-levellangfuse.environmentonto third-party OTel spans that do not already carry one. The implementation has no scope check: it runs on every span the TracerProvider sees, including spans created by other Langfuse clients sharing the same global TracerProvider. In multi-client setups this leaks one tenant's environment onto another tenant's spans before any project-scoping filter has a chance to run.Why multi-client setups share a TracerProvider
LangfuseResourceManager._instancesis keyed bypublic_key, so twoLangfuse(public_key=...)calls produce two distinct resource managers. But_init_tracer_provideronly constructs a new TracerProvider whenotel_trace_api.get_tracer_provider()is aProxyTracerProvider— the second client falls into theelsebranch and reuses the first client's provider, then callstracer_provider.add_span_processorto attach its ownLangfuseSpanProcessor. Both processors are now registered on the same provider. This is explicitly supported (see the'multi-project setups'comment atspan_processor.py:198andTestMultiProjectSetupintests/unit/test_otel.py).Step-by-step proof
Setup:
client_b._otel_tracer.start_as_current_spancreates the span. The OTel SDK'sSynchronousMultiSpanProcessor.on_startiterates processors in registration order and calls each one synchronously, beforestart_as_current_spanreturns._get_propagated_attributes_from_contextreturns an empty dict (nopropagate_attributesactive)._apply_default_environmentchecks: (a)ENVIRONMENTnot inattributes✓, (b)self._environment = 'env-a'is not None ✓, (c)get_string_span_attribute(span, ENVIRONMENT)returnsNonebecause nothing has written to the span yet ✓ — so it setsattributes[ENVIRONMENT] = 'env-a'. Thenon_startrunsif propagated_attributes: span.set_attributes(propagated_attributes), writinglangfuse.environment='env-a'onto the span synchronously.attributesdict is still empty, butget_string_span_attribute(span, ENVIRONMENT)now returns'env-a'(just written by proc_A), so_apply_default_environmentearly-returns.env-bis never applied.start_as_current_spanreturns. The wrapperLangfuseObservationWrapper.__init__atlangfuse/_client/span.py:134-143readsexisting_environment = get_string_span_attribute(self._otel_span, ENVIRONMENT)→'env-a'. It then short-circuitsself._environment = existing_environment or environment or self._langfuse_client._environmentto'env-a'and re-stamps that onto the span.on_end:_is_langfuse_project_spancorrectly rejects on proc_A (instrumentation-scopepublic_key='pk-B'≠'pk-A'), but proc_B accepts and exports the span to client B's backend — withlangfuse.environment='env-a', the other tenant's value.Why existing safeguards don't catch this
_is_langfuse_project_spanis only consulted inon_endand_is_expected_exported_at_start. The contamination happens aton_startbefore any project filtering runs. The Resource-levellangfuse.environmentset by_init_tracer_provideris also locked to the first-initialized client (only the first call creates a new TracerProvider with a Resource), so the per-span write was the only mechanism left to carry client B's environment — and the PR breaks that mechanism.Pre-PR behavior was correct
Before this PR, nothing wrote
langfuse.environmentduringon_start.existing_environmentin the wrapper wasNone, so the short-circuit resolved toclient._environment='env-b', correctly. This is a regression introduced by this PR specifically.Worse for third-party spans
For non-Langfuse third-party spans (e.g.
opentelemetry.instrumentation.openai),is_langfuse_span(span)returnsFalse, so theon_endproject filter is skipped entirely and both processors export the span. Both exported copies carrylangfuse.environment='env-a'(whichever processor was registered first wins). This contradicts the PR's stated intent of applying each client's environment to its own third-party spans.How to fix
Mirror the isolation already done in
on_endand_is_expected_exported_at_start: skip_apply_default_environmentwhenis_langfuse_span(span)but not_is_langfuse_project_span(span). This leaves Langfuse-created wrapper spans for their owning processor (the wrapper already applies the env correctly when nothing else has) and leaves third-party spans to whichever processor genuinely owns the surrounding trace context. Equivalently, narrow the apply to non-Langfuse spans only (which matches the docstring's stated intent — "Third-party OpenTelemetry spans only pass through this processor").