Observability
The digitalNXT Agency framework provides comprehensive observability through a multi-layered approach combining OpenTelemetry, Langfuse, and Temporal UI for complete visibility into your AI agent executions.
Observability Stack Overview
graph TB
subgraph "Application Layer"
Agent[AI Agents]
Strategy[Strategies]
Workflow[Temporal Workflows]
end
subgraph "Instrumentation Layer"
Processor[Processor with Spans]
OTelClient[OpenTelemetry Client]
LangfuseClient[Langfuse Client]
end
subgraph "Collection Layer"
OTelCollector[OpenTelemetry Collector]
end
subgraph "Storage & Analysis"
LocalLF[Local Langfuse<br/>Development]
RemoteLF[Remote Langfuse<br/>Production]
TemporalUI[Temporal UI<br/>Workflow Visibility]
end
Agent --> Processor
Strategy --> Processor
Workflow --> Processor
Processor --> OTelClient
Processor --> LangfuseClient
OTelClient --> OTelCollector
LangfuseClient --> OTelCollector
OTelCollector -->|Development| LocalLF
OTelCollector -->|Production| RemoteLF
Workflow -->|Direct| TemporalUI
classDef app fill:#e3f2fd
classDef instrument fill:#fff3e0
classDef collect fill:#e8f5e8
classDef storage fill:#fce4ec
class Agent,Strategy,Workflow app
class Processor,OTelClient,LangfuseClient instrument
class OTelCollector collect
class LocalLF,RemoteLF,TemporalUI storage
Three Layers of Observability
๐ง 1. LLM-Specific Observability (Langfuse)
Langfuse provides specialized observability for Large Language Model interactions:
- Token Usage Tracking: Monitor costs and consumption
- Latency Analysis: Identify slow model calls
- Quality Scoring: Evaluate response quality
- Session Tracking: Follow user conversations
- Prompt Management: Version control for prompts
๐ 2. Distributed Tracing (OpenTelemetry)
OpenTelemetry provides standard distributed tracing capabilities:
- Trace Correlation: Connect related operations
- Service Dependencies: Map service interactions
- Performance Metrics: Measure response times
- Error Tracking: Identify and debug failures
- Resource Attribution: Track compute usage
๐ 3. Workflow Visibility (Temporal UI)
Temporal UI provides workflow-specific insights:
- Execution History: Complete workflow timeline
- Activity Status: Real-time progress tracking
- Retry Patterns: Failed attempts and recovery
- Performance Metrics: Workflow duration analysis
- Task Queue Health: Worker pool monitoring
Environment-based Configuration
The observability system automatically routes traces based on your environment:
๐ Local Development
Perfect for development and debugging:
# Local Langfuse (default for development)
processor = Processor(
service_name="Development Service",
langfuse_mode=LangfuseMode.LOCAL
)
# Access points:
# - Langfuse UI: http://localhost:3000
# - Temporal UI: http://localhost:8080
# - OpenTelemetry Metrics: http://localhost:8888
Local Stack Components:
- Langfuse: Complete LLM observability at
localhost:3000 - Temporal: Workflow orchestration at
localhost:7233(UI:localhost:8080) - PostgreSQL: Data persistence (Langfuse:
5432, Temporal:5433) - OpenTelemetry Collector: Trace routing at
localhost:4317/4318
๐ Remote/Production
Scalable setup for production workloads:
# Remote Langfuse (production)
processor = Processor(
service_name="Production Customer Service",
langfuse_mode=LangfuseMode.REMOTE
)
Required Environment Variables:
LANGFUSE_PUBLIC_KEY=your_public_key
LANGFUSE_SECRET_KEY=your_secret_key
LANGFUSE_HOST=https://your-langfuse-instance.com
๐ Auto Mode (Smart Detection)
Automatically chooses the best available option:
# Auto mode (tries local first, then remote, fallback to basic logging)
processor = Processor(
langfuse_mode=LangfuseMode.AUTO # Default
)
Decision Flow:
graph TD
Start[Start Application] --> CheckLocal{Local Langfuse<br/>Available?}
CheckLocal -->|Yes| UseLocal[โ
Use Local Langfuse<br/>Perfect for Development]
CheckLocal -->|No| CheckRemote{Remote Config<br/>Available?}
CheckRemote -->|Yes| UseRemote[๐ Use Remote Langfuse<br/>Production Ready]
CheckRemote -->|No| UseBasic[๐ Use Basic Logging<br/>Minimal Observability]
UseLocal --> Success[๐ฏ Full Observability]
UseRemote --> Success
UseBasic --> Limited[๐ Limited Observability]
classDef success fill:#e8f5e8
classDef limited fill:#fce4ec
classDef decision fill:#fff3e0
class UseLocal,UseRemote,Success success
class UseBasic,Limited limited
class CheckLocal,CheckRemote decision
OpenTelemetry Collector Configuration
The OpenTelemetry Collector intelligently routes traces based on environment:
๐ Collector Configuration
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
send_batch_size: 1024
resource:
attributes:
- key: service.environment
value: ${env:AGENCY_ENVIRONMENT}
action: upsert
exporters:
# Local Langfuse (development)
otlphttp/local:
endpoint: http://langfuse:3000/api/public/ingestion
headers:
Authorization: Basic ${env:LANGFUSE_LOCAL_AUTH}
# Remote Langfuse (production)
otlphttp/remote:
endpoint: ${env:LANGFUSE_REMOTE_HOST}/api/public/ingestion
headers:
Authorization: Basic ${env:LANGFUSE_REMOTE_AUTH}
service:
pipelines:
traces:
receivers: [otlp]
processors: [resource, batch]
exporters: [otlphttp/local, otlphttp/remote]
๐ Docker Compose Integration
# docker-compose.yml (excerpt)
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
- "8888:8888" # Prometheus metrics
environment:
- AGENCY_ENVIRONMENT=${AGENCY_ENVIRONMENT:-development}
- LANGFUSE_LOCAL_AUTH=cGstbGYtMTIzNDU2Nzg5MGFiY2RlZjpzay1sZi0xMjM0NTY3ODkwYWJjZGVmMTIzNDU2Nzg5MGFiY2RlZg==
- LANGFUSE_REMOTE_HOST=${LANGFUSE_REMOTE_HOST}
- LANGFUSE_REMOTE_AUTH=${LANGFUSE_REMOTE_AUTH}
Trace Correlation
๐ End-to-End Tracing
Traces flow seamlessly across all components:
sequenceDiagram
participant Client as Client Application
participant Temporal as Temporal Workflow
participant Worker as Temporal Worker
participant Activity as Strategy Activity
participant Processor as Processor
participant Langfuse as Langfuse
participant Agent as AI Agent
Client->>Temporal: Start Workflow (trace_id: abc123)
Temporal->>Worker: Schedule Activity
Worker->>Activity: Execute Strategy
Activity->>Processor: Create Processor Span
Note over Processor: Span: "Customer Support - RouterStrategy"<br/>trace_id: abc123<br/>span_id: def456
Processor->>Agent: Execute Agent
Agent->>Langfuse: LLM Call Span
Note over Langfuse: LLM Span: "gpt-4 completion"<br/>parent_span_id: def456<br/>tokens: 1500, cost: $0.03
Langfuse-->>Agent: Response + Metadata
Agent-->>Processor: Agent Result
Processor-->>Activity: Strategy Result
Activity-->>Worker: Activity Complete
Worker-->>Temporal: Workflow Complete
Temporal-->>Client: Final Result
Note over Client,Agent: All spans connected by trace_id: abc123
Note over Client,Agent: Temporal workflow_id linked to Langfuse session
๐ท๏ธ Context Propagation
Context flows automatically through all layers:
# Temporal workflow context automatically flows to activities
@activity.defn
async def execute_strategy_directly(
strategy_name: str,
input_data: str,
processor_config: dict,
context: dict = None
):
# Temporal context is automatically added
try:
if workflow.info():
context.update({
"workflow_id": workflow.info().workflow_id,
"workflow_run_id": workflow.info().run_id,
"execution_context": "temporal_activity",
"execution_mode": "temporal",
})
except Exception:
# Handle non-workflow contexts gracefully
pass
# This context flows to Langfuse spans
processor = Processor(
session_id=processor_config.get("session_id"),
user_id=processor_config.get("user_id")
)
๐ Rich Span Metadata
Every span includes comprehensive metadata:
# Processor spans include full context
span_metadata = {
# Agent information
"agent_name": agent.name,
"agent_instructions": agent.instructions[:100],
# Execution context
"service": self.processor.service_name,
"execution_mode": "temporal", # or "direct"
"strategy_name": context.get("strategy_name"),
# User/session tracking
"session_id": self.processor.session_id,
"user_id": self.processor.user_id,
# Temporal workflow context (if available)
"workflow_id": context.get("workflow_id"),
"workflow_run_id": context.get("workflow_run_id"),
# Performance metrics
"execution_time_ms": execution_time * 1000,
"input_length": len(input_data),
"output_length": len(result.final_output),
}
Session and User Tracking
๐ฅ User Journey Tracking
Follow users across multiple interactions:
# Create processor with user context
processor = Processor(
service_name="Customer Support",
session_id=f"support_session_{user_id}_{timestamp}",
user_id=user_id,
langfuse_mode=LangfuseMode.AUTO
)
# All spans will be grouped by session_id in Langfuse
# User patterns are visible across time
๐ Session Analytics in Langfuse
Navigate to Sessions in Langfuse UI to see:
- User conversation flows
- Session duration and message counts
- Cost attribution by user
- Quality scores by session
- Error rates by user segment
๐ Cross-Workflow Session Continuity
Sessions persist across multiple Temporal workflows:
# Workflow 1: Initial inquiry
result1 = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=["router", "I need help with billing", {
"session_id": "user123_session_456",
"user_id": "user123"
}],
id="billing_inquiry_1",
)
# Workflow 2: Follow-up question (same session)
result2 = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=["specialist", "Can you explain the charges?", {
"session_id": "user123_session_456", # Same session!
"user_id": "user123"
}],
id="billing_followup_1",
)
# Both workflows appear in same Langfuse session
Performance Monitoring
โก Real-time Metrics
Monitor key performance indicators:
# Custom metrics in Temporal workflows
workflow.metric_meter().create_counter("strategies_executed").add(1, {
"strategy_name": strategy_name,
"success": "true"
})
workflow.metric_meter().create_histogram("strategy_duration").record(
execution_time, {
"strategy_name": strategy_name,
"user_type": "premium"
}
)
๐ Langfuse Analytics
Built-in analytics for LLM operations: - Token Usage: Track consumption and costs - Latency P95/P99: Identify slow operations - Error Rates: Monitor failure patterns - Quality Scores: Track response quality over time - User Segmentation: Analyze usage by customer type
๐ Temporal Metrics
Workflow-specific insights: - Task Queue Depth: Monitor worker capacity - Workflow Success Rates: Track completion rates - Activity Retry Patterns: Identify problematic operations - Worker Utilization: Optimize resource allocation
Debugging with Traces
๐ Failure Investigation
When something goes wrong, traces provide complete context:
- Start in Temporal UI: Find the failed workflow
- Get Workflow Context: Note workflow_id and run_id
- Search Langfuse: Use workflow_id to find related spans
- Analyze LLM Calls: Review model inputs/outputs
- Check Processor Logs: Detailed execution context
๐ Common Debugging Patterns
Trace a Specific User Issue
# Search Langfuse by user_id
# Filter spans by session_id
# Follow the complete user journey
user_traces = langfuse.get_traces(user_id="user123")
Find Performance Bottlenecks
# Query slow operations in Langfuse
slow_operations = langfuse.get_spans(
filter={
"latency_ms": {"$gt": 5000},
"service_name": "Customer Support"
}
)
Correlate Workflow Failures
# Get workflow details from Temporal
workflow_handle = client.get_workflow_handle(workflow_id)
history = await workflow_handle.get_history()
# Search related Langfuse spans
langfuse_spans = langfuse.get_spans(
filter={"metadata.workflow_id": workflow_id}
)
๐ฏ Error Context
Errors include rich context for debugging:
# Processor error spans include:
span.update(
input=input_data,
output=f"Error: {str(exception)}",
tags=["agent-execution", f"agent-{agent.name}", "error"],
metadata={
"error": str(exception),
"error_type": type(exception).__name__,
"stack_trace": traceback.format_exc(),
"workflow_id": context.get("workflow_id"),
"activity_id": context.get("activity_id"),
"retry_attempt": context.get("retry_attempt", 0)
}
)
Local Development Setup
๐ Quick Start
Start the complete observability stack:
# Start all observability services
docker-compose up langfuse langfuse-worker langfuse-postgres \
temporal temporal-postgres temporal-web \
otel-collector
# Wait for services to be healthy
docker-compose ps
# Access UIs
open http://localhost:3000 # Langfuse
open http://localhost:8080 # Temporal UI
open http://localhost:8888 # OpenTelemetry metrics
๐ Default Credentials
Local Langfuse:
- URL: http://localhost:3000
- Email: admin@langfuse.local
- Password: password
Local Temporal:
- Server: localhost:7233
- UI: http://localhost:8080 (no auth required)
๐งช Testing Observability
# Test script to verify observability
import asyncio
from agency.processors.processor import Processor, LangfuseMode
async def test_observability():
# This should auto-detect local Langfuse
processor = Processor(
service_name="Observability Test",
session_id="test_session_123",
user_id="test_user"
)
print("Service info:", processor.get_service_info())
# Run a test agent (if you have one configured)
# result = await processor.run(test_agent, "Hello, world!")
# print(f"Result: {result}")
asyncio.run(test_observability())
๐ Verify Setup
- Check Langfuse: Visit
localhost:3000and verify login - Check Temporal: Visit
localhost:8080and see the namespace - Check Collector: Visit
localhost:8888/metricsfor OpenTelemetry metrics - Run Test: Execute a simple workflow and verify traces appear
Production Best Practices
๐ Security
# Use secure credentials
export LANGFUSE_PUBLIC_KEY="pk-lf-prod-..."
export LANGFUSE_SECRET_KEY="sk-lf-prod-..."
export LANGFUSE_HOST="https://production.langfuse.com"
# Use TLS for Temporal
export TEMPORAL_TLS_CERT="/path/to/client.pem"
export TEMPORAL_TLS_KEY="/path/to/client-key.pem"
๐ Performance
# Optimize for high throughput
processor = Processor(
auto_flush=False, # Batch flushes for better performance
langfuse_mode=LangfuseMode.REMOTE
)
# Flush periodically
async def periodic_flush():
while True:
await asyncio.sleep(30) # Flush every 30 seconds
processor.langfuse_client.flush()
๐ Monitoring
# Set up alerts for key metrics
alerts = [
{"metric": "workflow_failure_rate", "threshold": 0.05},
{"metric": "average_latency", "threshold": 5000},
{"metric": "error_rate", "threshold": 0.01},
{"metric": "token_usage_daily", "threshold": 1000000}
]
Troubleshooting
๐ง Common Issues
No Traces Appearing
# Check OpenTelemetry Collector
curl http://localhost:8888/metrics | grep traces
# Check Langfuse connection
curl -H "Authorization: Basic $LANGFUSE_AUTH" \
http://localhost:3000/api/public/health
# Verify processor configuration
python -c "from agency.processors.processor import Processor;
p = Processor(); print(p.get_service_info())"
Performance Issues
# Disable auto-flush for high-throughput
processor = Processor(auto_flush=False)
# Use batching in OpenTelemetry Collector
# batch:
# timeout: 5s
# send_batch_size: 512
Connection Failures
# Test Langfuse connectivity
curl -v http://localhost:3000/api/public/health
# Test Temporal connectivity
grpcurl -plaintext localhost:7233 temporal.api.workflowservice.v1.WorkflowService/GetSystemInfo
Related Topics
- Temporal Workflows: Workflow orchestration and execution
- Temporal Workers: Worker configuration and management
- Processors: Processor observability integration
- Local Development: Setting up the development environment
Observability Benefits
Complete Visibility: Every AI interaction is traced and logged
Performance Insights: Identify bottlenecks and optimize costs
User Journey Tracking: Follow users across multiple sessions
Error Correlation: Quickly debug issues with full context
Production Ready: Scales from development to enterprise
Best Practices
Development: Use local Langfuse for immediate feedback
Production: Use remote Langfuse with proper authentication
Session Tracking: Always include user_id and session_id
Cost Management: Monitor token usage and set alerts
Error Handling: Include rich context in error spans
Performance Considerations
- Auto-flush adds latency but provides immediate visibility
- Batching improves performance but delays trace availability
- High-throughput applications should disable auto-flush
- Monitor collector resource usage in production