Skip to content

Observability

The digitalNXT Agency framework provides comprehensive observability through a multi-layered approach combining OpenTelemetry, Langfuse, and Temporal UI for complete visibility into your AI agent executions.

Observability Stack Overview

graph TB
    subgraph "Application Layer"
        Agent[AI Agents]
        Strategy[Strategies]
        Workflow[Temporal Workflows]
    end

    subgraph "Instrumentation Layer"
        Processor[Processor with Spans]
        OTelClient[OpenTelemetry Client]
        LangfuseClient[Langfuse Client]
    end

    subgraph "Collection Layer"
        OTelCollector[OpenTelemetry Collector]
    end

    subgraph "Storage & Analysis"
        LocalLF[Local Langfuse<br/>Development]
        RemoteLF[Remote Langfuse<br/>Production]
        TemporalUI[Temporal UI<br/>Workflow Visibility]
    end

    Agent --> Processor
    Strategy --> Processor
    Workflow --> Processor

    Processor --> OTelClient
    Processor --> LangfuseClient
    OTelClient --> OTelCollector
    LangfuseClient --> OTelCollector

    OTelCollector -->|Development| LocalLF
    OTelCollector -->|Production| RemoteLF
    Workflow -->|Direct| TemporalUI

    classDef app fill:#e3f2fd
    classDef instrument fill:#fff3e0
    classDef collect fill:#e8f5e8
    classDef storage fill:#fce4ec

    class Agent,Strategy,Workflow app
    class Processor,OTelClient,LangfuseClient instrument
    class OTelCollector collect
    class LocalLF,RemoteLF,TemporalUI storage

Three Layers of Observability

๐Ÿง  1. LLM-Specific Observability (Langfuse)

Langfuse provides specialized observability for Large Language Model interactions:

  • Token Usage Tracking: Monitor costs and consumption
  • Latency Analysis: Identify slow model calls
  • Quality Scoring: Evaluate response quality
  • Session Tracking: Follow user conversations
  • Prompt Management: Version control for prompts

๐ŸŒ 2. Distributed Tracing (OpenTelemetry)

OpenTelemetry provides standard distributed tracing capabilities:

  • Trace Correlation: Connect related operations
  • Service Dependencies: Map service interactions
  • Performance Metrics: Measure response times
  • Error Tracking: Identify and debug failures
  • Resource Attribution: Track compute usage

๐Ÿ”„ 3. Workflow Visibility (Temporal UI)

Temporal UI provides workflow-specific insights:

  • Execution History: Complete workflow timeline
  • Activity Status: Real-time progress tracking
  • Retry Patterns: Failed attempts and recovery
  • Performance Metrics: Workflow duration analysis
  • Task Queue Health: Worker pool monitoring

Environment-based Configuration

The observability system automatically routes traces based on your environment:

๐Ÿ  Local Development

Perfect for development and debugging:

# Local Langfuse (default for development)
processor = Processor(
    service_name="Development Service",
    langfuse_mode=LangfuseMode.LOCAL
)

# Access points:
# - Langfuse UI: http://localhost:3000
# - Temporal UI: http://localhost:8080
# - OpenTelemetry Metrics: http://localhost:8888

Local Stack Components:

  • Langfuse: Complete LLM observability at localhost:3000
  • Temporal: Workflow orchestration at localhost:7233 (UI: localhost:8080)
  • PostgreSQL: Data persistence (Langfuse: 5432, Temporal: 5433)
  • OpenTelemetry Collector: Trace routing at localhost:4317/4318

๐ŸŒ Remote/Production

Scalable setup for production workloads:

# Remote Langfuse (production)
processor = Processor(
    service_name="Production Customer Service",
    langfuse_mode=LangfuseMode.REMOTE
)

Required Environment Variables:

LANGFUSE_PUBLIC_KEY=your_public_key
LANGFUSE_SECRET_KEY=your_secret_key
LANGFUSE_HOST=https://your-langfuse-instance.com

๐Ÿ”„ Auto Mode (Smart Detection)

Automatically chooses the best available option:

# Auto mode (tries local first, then remote, fallback to basic logging)
processor = Processor(
    langfuse_mode=LangfuseMode.AUTO  # Default
)

Decision Flow:

graph TD
    Start[Start Application] --> CheckLocal{Local Langfuse<br/>Available?}
    CheckLocal -->|Yes| UseLocal[โœ… Use Local Langfuse<br/>Perfect for Development]
    CheckLocal -->|No| CheckRemote{Remote Config<br/>Available?}
    CheckRemote -->|Yes| UseRemote[๐ŸŒ Use Remote Langfuse<br/>Production Ready]
    CheckRemote -->|No| UseBasic[๐Ÿ“ Use Basic Logging<br/>Minimal Observability]

    UseLocal --> Success[๐ŸŽฏ Full Observability]
    UseRemote --> Success
    UseBasic --> Limited[๐Ÿ“‹ Limited Observability]

    classDef success fill:#e8f5e8
    classDef limited fill:#fce4ec
    classDef decision fill:#fff3e0

    class UseLocal,UseRemote,Success success
    class UseBasic,Limited limited
    class CheckLocal,CheckRemote decision

OpenTelemetry Collector Configuration

The OpenTelemetry Collector intelligently routes traces based on environment:

๐Ÿ“Š Collector Configuration

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 1s
    send_batch_size: 1024

  resource:
    attributes:
      - key: service.environment
        value: ${env:AGENCY_ENVIRONMENT}
        action: upsert

exporters:
  # Local Langfuse (development)
  otlphttp/local:
    endpoint: http://langfuse:3000/api/public/ingestion
    headers:
      Authorization: Basic ${env:LANGFUSE_LOCAL_AUTH}

  # Remote Langfuse (production)
  otlphttp/remote:
    endpoint: ${env:LANGFUSE_REMOTE_HOST}/api/public/ingestion
    headers:
      Authorization: Basic ${env:LANGFUSE_REMOTE_AUTH}

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [resource, batch]
      exporters: [otlphttp/local, otlphttp/remote]

๐Ÿš€ Docker Compose Integration

# docker-compose.yml (excerpt)
otel-collector:
  image: otel/opentelemetry-collector-contrib:latest
  ports:
    - "4317:4317"   # OTLP gRPC receiver
    - "4318:4318"   # OTLP HTTP receiver
    - "8888:8888"   # Prometheus metrics
  environment:
    - AGENCY_ENVIRONMENT=${AGENCY_ENVIRONMENT:-development}
    - LANGFUSE_LOCAL_AUTH=cGstbGYtMTIzNDU2Nzg5MGFiY2RlZjpzay1sZi0xMjM0NTY3ODkwYWJjZGVmMTIzNDU2Nzg5MGFiY2RlZg==
    - LANGFUSE_REMOTE_HOST=${LANGFUSE_REMOTE_HOST}
    - LANGFUSE_REMOTE_AUTH=${LANGFUSE_REMOTE_AUTH}

Trace Correlation

๐Ÿ”— End-to-End Tracing

Traces flow seamlessly across all components:

sequenceDiagram
    participant Client as Client Application
    participant Temporal as Temporal Workflow
    participant Worker as Temporal Worker
    participant Activity as Strategy Activity
    participant Processor as Processor
    participant Langfuse as Langfuse
    participant Agent as AI Agent

    Client->>Temporal: Start Workflow (trace_id: abc123)
    Temporal->>Worker: Schedule Activity
    Worker->>Activity: Execute Strategy

    Activity->>Processor: Create Processor Span
    Note over Processor: Span: "Customer Support - RouterStrategy"<br/>trace_id: abc123<br/>span_id: def456

    Processor->>Agent: Execute Agent
    Agent->>Langfuse: LLM Call Span
    Note over Langfuse: LLM Span: "gpt-4 completion"<br/>parent_span_id: def456<br/>tokens: 1500, cost: $0.03

    Langfuse-->>Agent: Response + Metadata
    Agent-->>Processor: Agent Result
    Processor-->>Activity: Strategy Result
    Activity-->>Worker: Activity Complete
    Worker-->>Temporal: Workflow Complete
    Temporal-->>Client: Final Result

    Note over Client,Agent: All spans connected by trace_id: abc123
    Note over Client,Agent: Temporal workflow_id linked to Langfuse session

๐Ÿท๏ธ Context Propagation

Context flows automatically through all layers:

# Temporal workflow context automatically flows to activities
@activity.defn
async def execute_strategy_directly(
    strategy_name: str,
    input_data: str,
    processor_config: dict,
    context: dict = None
):
    # Temporal context is automatically added
    try:
        if workflow.info():
            context.update({
                "workflow_id": workflow.info().workflow_id,
                "workflow_run_id": workflow.info().run_id,
                "execution_context": "temporal_activity",
                "execution_mode": "temporal",
            })
    except Exception:
        # Handle non-workflow contexts gracefully
        pass

    # This context flows to Langfuse spans
    processor = Processor(
        session_id=processor_config.get("session_id"),
        user_id=processor_config.get("user_id")
    )

๐Ÿ“Š Rich Span Metadata

Every span includes comprehensive metadata:

# Processor spans include full context
span_metadata = {
    # Agent information
    "agent_name": agent.name,
    "agent_instructions": agent.instructions[:100],

    # Execution context
    "service": self.processor.service_name,
    "execution_mode": "temporal",  # or "direct"
    "strategy_name": context.get("strategy_name"),

    # User/session tracking
    "session_id": self.processor.session_id,
    "user_id": self.processor.user_id,

    # Temporal workflow context (if available)
    "workflow_id": context.get("workflow_id"),
    "workflow_run_id": context.get("workflow_run_id"),

    # Performance metrics
    "execution_time_ms": execution_time * 1000,
    "input_length": len(input_data),
    "output_length": len(result.final_output),
}

Session and User Tracking

๐Ÿ‘ฅ User Journey Tracking

Follow users across multiple interactions:

# Create processor with user context
processor = Processor(
    service_name="Customer Support",
    session_id=f"support_session_{user_id}_{timestamp}",
    user_id=user_id,
    langfuse_mode=LangfuseMode.AUTO
)

# All spans will be grouped by session_id in Langfuse
# User patterns are visible across time

๐Ÿ“Š Session Analytics in Langfuse

Navigate to Sessions in Langfuse UI to see:

  • User conversation flows
  • Session duration and message counts
  • Cost attribution by user
  • Quality scores by session
  • Error rates by user segment

๐Ÿ”„ Cross-Workflow Session Continuity

Sessions persist across multiple Temporal workflows:

# Workflow 1: Initial inquiry
result1 = await client.execute_workflow(
    GenericStrategyWorkflow.run,
    args=["router", "I need help with billing", {
        "session_id": "user123_session_456",
        "user_id": "user123"
    }],
    id="billing_inquiry_1",
)

# Workflow 2: Follow-up question (same session)
result2 = await client.execute_workflow(
    GenericStrategyWorkflow.run,
    args=["specialist", "Can you explain the charges?", {
        "session_id": "user123_session_456",  # Same session!
        "user_id": "user123"
    }],
    id="billing_followup_1",
)

# Both workflows appear in same Langfuse session

Performance Monitoring

โšก Real-time Metrics

Monitor key performance indicators:

# Custom metrics in Temporal workflows
workflow.metric_meter().create_counter("strategies_executed").add(1, {
    "strategy_name": strategy_name,
    "success": "true"
})

workflow.metric_meter().create_histogram("strategy_duration").record(
    execution_time, {
        "strategy_name": strategy_name,
        "user_type": "premium"
    }
)

๐Ÿ“ˆ Langfuse Analytics

Built-in analytics for LLM operations: - Token Usage: Track consumption and costs - Latency P95/P99: Identify slow operations - Error Rates: Monitor failure patterns - Quality Scores: Track response quality over time - User Segmentation: Analyze usage by customer type

๐Ÿ“Š Temporal Metrics

Workflow-specific insights: - Task Queue Depth: Monitor worker capacity - Workflow Success Rates: Track completion rates - Activity Retry Patterns: Identify problematic operations - Worker Utilization: Optimize resource allocation

Debugging with Traces

๐Ÿ” Failure Investigation

When something goes wrong, traces provide complete context:

  1. Start in Temporal UI: Find the failed workflow
  2. Get Workflow Context: Note workflow_id and run_id
  3. Search Langfuse: Use workflow_id to find related spans
  4. Analyze LLM Calls: Review model inputs/outputs
  5. Check Processor Logs: Detailed execution context

๐Ÿ› Common Debugging Patterns

Trace a Specific User Issue

# Search Langfuse by user_id
# Filter spans by session_id
# Follow the complete user journey
user_traces = langfuse.get_traces(user_id="user123")

Find Performance Bottlenecks

# Query slow operations in Langfuse
slow_operations = langfuse.get_spans(
    filter={
        "latency_ms": {"$gt": 5000},
        "service_name": "Customer Support"
    }
)

Correlate Workflow Failures

# Get workflow details from Temporal
workflow_handle = client.get_workflow_handle(workflow_id)
history = await workflow_handle.get_history()

# Search related Langfuse spans
langfuse_spans = langfuse.get_spans(
    filter={"metadata.workflow_id": workflow_id}
)

๐ŸŽฏ Error Context

Errors include rich context for debugging:

# Processor error spans include:
span.update(
    input=input_data,
    output=f"Error: {str(exception)}",
    tags=["agent-execution", f"agent-{agent.name}", "error"],
    metadata={
        "error": str(exception),
        "error_type": type(exception).__name__,
        "stack_trace": traceback.format_exc(),
        "workflow_id": context.get("workflow_id"),
        "activity_id": context.get("activity_id"),
        "retry_attempt": context.get("retry_attempt", 0)
    }
)

Local Development Setup

๐Ÿš€ Quick Start

Start the complete observability stack:

# Start all observability services
docker-compose up langfuse langfuse-worker langfuse-postgres \
                   temporal temporal-postgres temporal-web \
                   otel-collector

# Wait for services to be healthy
docker-compose ps

# Access UIs
open http://localhost:3000  # Langfuse
open http://localhost:8080  # Temporal UI
open http://localhost:8888  # OpenTelemetry metrics

๐Ÿ”‘ Default Credentials

Local Langfuse: - URL: http://localhost:3000 - Email: admin@langfuse.local - Password: password

Local Temporal: - Server: localhost:7233 - UI: http://localhost:8080 (no auth required)

๐Ÿงช Testing Observability

# Test script to verify observability
import asyncio
from agency.processors.processor import Processor, LangfuseMode

async def test_observability():
    # This should auto-detect local Langfuse
    processor = Processor(
        service_name="Observability Test",
        session_id="test_session_123",
        user_id="test_user"
    )

    print("Service info:", processor.get_service_info())

    # Run a test agent (if you have one configured)
    # result = await processor.run(test_agent, "Hello, world!")
    # print(f"Result: {result}")

asyncio.run(test_observability())

๐Ÿ“Š Verify Setup

  1. Check Langfuse: Visit localhost:3000 and verify login
  2. Check Temporal: Visit localhost:8080 and see the namespace
  3. Check Collector: Visit localhost:8888/metrics for OpenTelemetry metrics
  4. Run Test: Execute a simple workflow and verify traces appear

Production Best Practices

๐Ÿ”’ Security

# Use secure credentials
export LANGFUSE_PUBLIC_KEY="pk-lf-prod-..."
export LANGFUSE_SECRET_KEY="sk-lf-prod-..."
export LANGFUSE_HOST="https://production.langfuse.com"

# Use TLS for Temporal
export TEMPORAL_TLS_CERT="/path/to/client.pem"
export TEMPORAL_TLS_KEY="/path/to/client-key.pem"

๐Ÿ“ˆ Performance

# Optimize for high throughput
processor = Processor(
    auto_flush=False,  # Batch flushes for better performance
    langfuse_mode=LangfuseMode.REMOTE
)

# Flush periodically
async def periodic_flush():
    while True:
        await asyncio.sleep(30)  # Flush every 30 seconds
        processor.langfuse_client.flush()

๐Ÿ“Š Monitoring

# Set up alerts for key metrics
alerts = [
    {"metric": "workflow_failure_rate", "threshold": 0.05},
    {"metric": "average_latency", "threshold": 5000},
    {"metric": "error_rate", "threshold": 0.01},
    {"metric": "token_usage_daily", "threshold": 1000000}
]

Troubleshooting

๐Ÿ”ง Common Issues

No Traces Appearing

# Check OpenTelemetry Collector
curl http://localhost:8888/metrics | grep traces

# Check Langfuse connection
curl -H "Authorization: Basic $LANGFUSE_AUTH" \
     http://localhost:3000/api/public/health

# Verify processor configuration
python -c "from agency.processors.processor import Processor;
           p = Processor(); print(p.get_service_info())"

Performance Issues

# Disable auto-flush for high-throughput
processor = Processor(auto_flush=False)

# Use batching in OpenTelemetry Collector
# batch:
#   timeout: 5s
#   send_batch_size: 512

Connection Failures

# Test Langfuse connectivity
curl -v http://localhost:3000/api/public/health

# Test Temporal connectivity
grpcurl -plaintext localhost:7233 temporal.api.workflowservice.v1.WorkflowService/GetSystemInfo

Observability Benefits

Complete Visibility: Every AI interaction is traced and logged

Performance Insights: Identify bottlenecks and optimize costs

User Journey Tracking: Follow users across multiple sessions

Error Correlation: Quickly debug issues with full context

Production Ready: Scales from development to enterprise

Best Practices

Development: Use local Langfuse for immediate feedback

Production: Use remote Langfuse with proper authentication

Session Tracking: Always include user_id and session_id

Cost Management: Monitor token usage and set alerts

Error Handling: Include rich context in error spans

Performance Considerations

  • Auto-flush adds latency but provides immediate visibility
  • Batching improves performance but delays trace availability
  • High-throughput applications should disable auto-flush
  • Monitor collector resource usage in production