Skip to content

Processors

The Processor is the execution engine of the digitalNXT Agency framework, responsible for running AI agents with comprehensive observability, error handling, and performance monitoring. It bridges the gap between high-level strategy logic and low-level agent execution.

Core Concepts

⚙️ Execution Engine

The Processor acts as a sophisticated execution wrapper that: - Handles both synchronous and asynchronous agent execution - Temporal Integration: Seamlessly works within Temporal activities for durable execution - Provides comprehensive observability through Langfuse integration - Manages OpenTelemetry instrumentation automatically - Supports SQLiteSession integration for persistent memory - Implements robust error handling and recovery mechanisms

📊 Observability First

Built with observability as a core principle: - Automatic Tracing: Every agent execution gets traced - Performance Monitoring: Execution times and resource usage tracking - Error Correlation: Errors are automatically linked to execution context - Input/Output Logging: Full request/response logging for debugging

Processor Architecture

graph TD
    Processor[⚙️ Processor] --> AsyncExec[🔄 Async Executor]
    Processor --> SyncExec[⏱️ Sync Executor]
    Processor --> LangfuseClient[📈 Langfuse Client]
    Processor --> Logger[📝 Logger]

    AsyncExec --> |async| OpenAIRunner[🧠 OpenAI Runner]
    SyncExec --> |sync| OpenAIRunner

    LangfuseClient --> |local| LocalLangfuse[🏠 Local Langfuse]
    LangfuseClient --> |remote| RemoteLangfuse[🌐 Remote Langfuse]

    subgraph "Observability Stack"
        LocalLangfuse --> Spans[📋 Spans]
        RemoteLangfuse --> Spans
        Logger --> LogEntries[📝 Log Entries]
        Spans --> TraceAnalysis[📊 Trace Analysis]
    end

    subgraph "Configuration Modes"
        AutoMode[🔄 Auto Mode]
        LocalMode[🏠 Local Mode]
        RemoteMode[🌐 Remote Mode]
        DisabledMode[🚫 Disabled Mode]
    end

    Processor --> AutoMode
    Processor --> LocalMode
    Processor --> RemoteMode
    Processor --> DisabledMode

    classDef processorClass fill:#fff3e0
    classDef executorClass fill:#e8f5e8
    classDef observabilityClass fill:#fce4ec
    classDef configClass fill:#e3f2fd

    class Processor processorClass
    class AsyncExec,SyncExec,OpenAIRunner executorClass
    class LangfuseClient,LocalLangfuse,RemoteLangfuse,Logger,Spans,LogEntries,TraceAnalysis observabilityClass
    class AutoMode,LocalMode,RemoteMode,DisabledMode configClass

Processor Components

🔄 Async Executor

Handles asynchronous agent execution with full observability:

class AsyncExecutor:
    async def run_with_langfuse_span(self, agent, input_data, context, **kwargs):
        """Execute agent with Langfuse span for comprehensive tracing."""
        span_name = self._construct_span_name(context, agent)

        with self.processor.langfuse_client.start_as_current_span(name=span_name) as span:
            try:
                result = await Runner.run(agent, input_data, context=context, **kwargs)

                # Update span with execution details
                span.update(
                    input=input_data,
                    output=result.final_output,
                    tags=["agent-execution", f"agent-{agent.name}"],
                    metadata={
                        "agent_name": agent.name,
                        "service": self.processor.service_name,
                        "session_id": self.processor.session_id,
                        "user_id": self.processor.user_id,
                    }
                )
                return result
            except Exception as e:
                # Comprehensive error tracking
                span.update(
                    input=input_data,
                    output=f"Error: {str(e)}",
                    tags=["agent-execution", f"agent-{agent.name}", "error"],
                    metadata={
                        "error": str(e),
                        "error_type": type(e).__name__,
                    }
                )
                raise

⏱️ Sync Executor

Provides synchronous execution capabilities with identical observability features:

class SyncExecutor:
    def run_with_langfuse_span(self, agent, input_data, context, **kwargs):
        """Synchronous version with same observability features."""
        # Identical logic to AsyncExecutor but using Runner.run_sync()
        return Runner.run_sync(agent, input_data, context=context, **kwargs)

Langfuse Integration Modes

🔄 Auto Mode (Default)

Automatically detects the best observability option:

graph TD
    AutoMode[🔄 Auto Mode] --> CheckLocal{🏠 Local Available?}
    CheckLocal --> |Yes| UseLocal[✅ Use Local Langfuse]
    CheckLocal --> |No| CheckRemote{🌐 Remote Configured?}
    CheckRemote --> |Yes| UseRemote[✅ Use Remote Langfuse]
    CheckRemote --> |No| UseBasic[📝 Use Basic Logging]

    UseLocal --> Success[🎯 Full Observability]
    UseRemote --> Success
    UseBasic --> Limited[📋 Limited Observability]

    classDef autoClass fill:#e3f2fd
    classDef checkClass fill:#fff3e0
    classDef successClass fill:#e8f5e8
    classDef limitedClass fill:#fce4ec

    class AutoMode autoClass
    class CheckLocal,CheckRemote checkClass
    class UseLocal,UseRemote,Success successClass
    class UseBasic,Limited limitedClass

🏠 Local Mode

Forces connection to local Langfuse instance (ideal for development):

# Local Langfuse defaults
LOCAL_LANGFUSE_CONFIG = {
    "host": "http://localhost:3000",
    "public_key": "pk-lf-1234567890abcdef",
    "secret_key": "sk-lf-1234567890abcdef1234567890abcdef"
}

processor = Processor(langfuse_mode=LangfuseMode.LOCAL)

🌐 Remote Mode

Uses production Langfuse from environment variables:

# Required environment variables
LANGFUSE_PUBLIC_KEY=your_public_key
LANGFUSE_SECRET_KEY=your_secret_key
LANGFUSE_HOST=https://your-langfuse-instance.com

processor = Processor(langfuse_mode=LangfuseMode.REMOTE)

🚫 Disabled Mode

Completely disables observability for maximum performance:

processor = Processor(langfuse_mode=LangfuseMode.DISABLED)

Processor Configuration

🔧 Basic Configuration

from agency.processors.processor import Processor, LangfuseMode

# Minimal configuration
processor = Processor()

# Full configuration
processor = Processor(
    service_name="Customer Support Service",
    enable_observability=True,
    langfuse_mode=LangfuseMode.AUTO,
    run_config=RunConfig(),
    auto_flush=True,
    session_id="session_123",
    user_id="user_456",
    logger=custom_logger
)

📊 Advanced Configuration Options

Parameter Type Default Description
service_name str "Agency workflow" Service identifier for tracing
enable_observability bool True Enable/disable observability features
langfuse_mode LangfuseMode AUTO Langfuse connection strategy
run_config RunConfig RunConfig() OpenAI Runner configuration
auto_flush bool True Automatically flush traces after execution
session_id str None Session ID for trace grouping
user_id str None User ID for trace attribution
logger Logger None Custom logger instance

Execution Flow

🔄 Async Execution Flow

sequenceDiagram
    participant Strategy
    participant Processor
    participant AsyncExecutor
    participant LangfuseClient
    participant OpenAIRunner
    participant Agent

    Strategy->>Processor: run(agent, input_data, context)
    Processor->>Processor: Log start: "🚀 Running agent 'AgentName'"

    alt Langfuse Available
        Processor->>AsyncExecutor: run_with_langfuse_span()
        AsyncExecutor->>LangfuseClient: start_as_current_span()
        LangfuseClient-->>AsyncExecutor: span context

        AsyncExecutor->>OpenAIRunner: Runner.run(agent, input, context)
        OpenAIRunner->>Agent: Execute agent logic
        Agent-->>OpenAIRunner: Agent response
        OpenAIRunner-->>AsyncExecutor: Execution result

        AsyncExecutor->>LangfuseClient: span.update(input, output, metadata)

        alt auto_flush enabled
            AsyncExecutor->>LangfuseClient: flush()
        end

        AsyncExecutor-->>Processor: Final result
    else No Langfuse
        Processor->>AsyncExecutor: run_without_langfuse_span()
        AsyncExecutor->>OpenAIRunner: Runner.run(agent, input, context)
        OpenAIRunner-->>AsyncExecutor: Execution result
        AsyncExecutor-->>Processor: Final result
    end

    Processor->>Processor: Log completion: "✅ Agent completed successfully"
    Processor->>Processor: Log result preview: "📝 Result: ..."
    Processor-->>Strategy: Execution result

⏱️ Sync Execution Flow

Identical to async flow but uses Runner.run_sync() instead of Runner.run().

Span Construction and Naming

🏷️ Intelligent Span Naming

The processor constructs meaningful span names based on context:

def _construct_span_name(self, context, agent):
    """Construct contextual span names for better observability."""
    agency_name = context.get("agency_name") if context else None
    strategy_name = context.get("strategy_name") if context else None

    if agency_name and strategy_name:
        return f"{agency_name} - {strategy_name}"
    elif agency_name:
        return f"{agency_name} - Agent"
    elif strategy_name:
        return f"{strategy_name} - Agent"
    else:
        return f"Agent - {agent.name}"

Example Span Names:

  • "Customer Support - RouterStrategy" (Full context)
  • "Customer Support - Agent" (Agency only)
  • "RouterStrategy - Agent" (Strategy only)
  • "Agent - Billing Specialist" (Agent fallback)

📊 Span Metadata

Every span includes comprehensive metadata:

span_metadata = {
    "agent_name": agent.name,
    "service": self.processor.service_name,
    "session_id": self.processor.session_id,
    "user_id": self.processor.user_id,
    # Error cases also include:
    "error": str(exception),
    "error_type": type(exception).__name__
}

Error Handling

🛡️ Comprehensive Error Management

The processor provides robust error handling at multiple levels:

Exception Propagation with Context

try:
    result = await processor.run(agent, input_data, context)
except Exception as e:
    # Processor automatically:
    # 1. Logs the error with agent context
    # 2. Updates Langfuse span with error details
    # 3. Preserves full stack trace
    # 4. Re-raises for higher-level handling
    logger.error(f"Agent execution failed: {e}")
    raise

Error Span Updates

# On error, spans are updated with detailed error information
span.update(
    input=input_data,
    output=f"Error: {str(exception)}",
    tags=["agent-execution", f"agent-{agent.name}", "error"],
    metadata={
        "agent_name": agent.name,
        "service": self.processor.service_name,
        "error": str(exception),
        "error_type": type(exception).__name__,
        "session_id": self.processor.session_id,
        "user_id": self.processor.user_id
    }
)

Performance Features

Optimization Strategies

Auto-Flush Control

# Enable for real-time monitoring (slight performance cost)
processor = Processor(auto_flush=True)

# Disable for high-throughput scenarios
processor = Processor(auto_flush=False)
# Manually flush when needed
processor.langfuse_client.flush()

Connection Reuse

# Processors are designed to be long-lived
# Create once, use many times
processor = Processor(service_name="Production Service")

# Reuse across multiple executions
for input_data in batch_inputs:
    result = await processor.run(agent, input_data, context)

Efficient Logging

# Output preview truncation prevents log bloat
# Long outputs are automatically truncated to 200 characters
self.logger.info(f"📝 Result: {output_preview[:200]}...")

Service Information

📋 Runtime Information

Get comprehensive processor configuration:

info = processor.get_service_info()
print(info)

# Example output:
{
    "service_name": "Customer Support Service",
    "observability_type": "langfuse_local",  # or "langfuse_remote", "basic", "disabled"
    "langfuse_mode": "auto",
    "auto_flush": True,
    "session_id": "session_123",
    "user_id": "user_456"
}

Integration Patterns

🔌 Strategy Integration

Processors are automatically provided to strategies by the Agency:

class CustomStrategy(Strategy):
    async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
        # self.processor is automatically available
        if self.processor:
            result = await self.processor.run(
                agent=self.my_agent,
                input_data=input_data,
                context=context
            )
        else:
            # Fallback for testing or special cases
            result = await Runner.run(self.my_agent, input_data, context=context)

        return result.final_output

🏗️ Direct Usage

For advanced scenarios, use processors directly:

from agency.processors.processor import Processor, LangfuseMode
from agents import Agent

# Create processor
processor = Processor(
    service_name="Direct Execution Service",
    langfuse_mode=LangfuseMode.REMOTE,
    session_id="direct_session",
    user_id="direct_user"
)

# Create agent
agent = Agent(
    name="Direct Agent",
    instructions="You are a helpful assistant."
)

# Execute directly
result = await processor.run(
    agent=agent,
    input_data="Hello, how can you help me?",
    context={"source": "direct_api", "priority": "high"}
)

print(f"Response: {result.final_output}")

Best Practices

Do

# Use meaningful service names
processor = Processor(service_name="Customer Support - Billing Department")

# Provide session and user context when available
processor = Processor(
    session_id=user_session.id,
    user_id=user.id
)

# Handle processor availability gracefully in strategies
if self.processor:
    result = await self.processor.run(agent, input_data, context)
else:
    self.logger.warning("No processor available")
    result = await Runner.run(agent, input_data, context=context)

# Use appropriate Langfuse modes for your environment
processor = Processor(
    langfuse_mode=LangfuseMode.LOCAL  # for development
    # langfuse_mode=LangfuseMode.REMOTE  # for production
)

Avoid

# Don't create new processors for each execution
for input_data in inputs:
    processor = Processor()  # Inefficient!
    result = await processor.run(agent, input_data)

# Don't ignore processor setup errors silently
try:
    processor = Processor(langfuse_mode=LangfuseMode.REMOTE)
except Exception:
    pass  # This could lead to unexpected behavior

# Don't hardcode service names
processor = Processor(service_name="Service1")  # Not descriptive

Troubleshooting

🔧 Common Issues

Langfuse Connection Failed

# Check if local Langfuse is running
# curl http://localhost:3000/api/public/health

# Verify environment variables for remote
import os
print("LANGFUSE_HOST:", os.getenv("LANGFUSE_HOST"))
print("LANGFUSE_PUBLIC_KEY:", os.getenv("LANGFUSE_PUBLIC_KEY"))
print("LANGFUSE_SECRET_KEY:", "***" if os.getenv("LANGFUSE_SECRET_KEY") else "Not set")

Performance Issues

# Disable auto-flush for high-throughput scenarios
processor = Processor(auto_flush=False)

# Use DISABLED mode for maximum performance
processor = Processor(langfuse_mode=LangfuseMode.DISABLED)

# Monitor execution times
start_time = time.time()
result = await processor.run(agent, input_data)
execution_time = time.time() - start_time
print(f"Execution took: {execution_time:.2f}s")

Temporal Integration

🔄 Processors in Temporal Activities

Processors seamlessly integrate with Temporal workflows, providing the same observability and error handling within durable activities:

@activity.defn
async def execute_strategy_directly(
    strategy_name: str,
    input_data: str,
    processor_config: dict,
    strategy_overrides: dict = None,
    context: dict = None
) -> StrategyResult:
    """Execute strategy in Temporal activity with processor observability."""

    # Processor automatically adds Temporal context
    processor = Processor(
        service_name=processor_config.get("service_name"),
        langfuse_mode=LangfuseMode(processor_config.get("langfuse_mode")),
        session_id=processor_config.get("session_id"),
        user_id=processor_config.get("user_id"),
    )

    # Context automatically includes workflow information
    if workflow.info():
        context.update({
            "workflow_id": workflow.info().workflow_id,
            "workflow_run_id": workflow.info().run_id,
            "execution_context": "temporal_activity",
            "execution_mode": "temporal",
        })

    # Execute strategy with full observability
    strategy = StrategyRegistry.get(strategy_name)(**strategy_overrides)
    strategy.set_processor(processor)
    result = await strategy.execute(input_data, context)

    return result

🔗 Trace Correlation

Processors automatically correlate Temporal workflows with Langfuse traces:

# Temporal workflow ID flows into Langfuse spans
span_metadata = {
    "workflow_id": "customer-inquiry-123",
    "workflow_run_id": "abc-def-456",
    "execution_mode": "temporal",
    "strategy_name": "router",
    "service_name": "Customer Support",
}

# Search Langfuse by workflow_id to find all related traces

🧩 Session Continuity

Sessions persist across Temporal activity executions:

# Activity 1: Initial inquiry
session = SessionFactory.create_user_session("user123", "temp_sessions")

# Activity 2: Follow-up (same session)
# Session is automatically reloaded using user_id
session = SessionFactory.create_user_session("user123", "temp_sessions")
# Conversation history is preserved

Benefits in Temporal

  • Durability: Processor observability survives activity retries
  • Consistency: Same observability patterns across direct and Temporal execution
  • Debugging: Full trace correlation between Temporal UI and Langfuse
  • Recovery: Failed activities include rich error context
  • Scaling: Processors work seamlessly across multiple workers

Observability Strategy

Development Environment: - Use LangfuseMode.LOCAL with local Langfuse instance - Enable auto_flush=True for immediate feedback - Use descriptive service names for easy identification

Production Environment: - Use LangfuseMode.REMOTE with hosted Langfuse - Consider auto_flush=False for high-throughput applications - Implement proper session and user tracking

Testing Environment: - Use LangfuseMode.DISABLED for unit tests - Mock processors for isolated strategy testing - Enable detailed logging for debugging

Performance Considerations

  • Processors are designed to be long-lived; avoid creating new instances frequently
  • Auto-flush adds latency but provides immediate observability
  • Langfuse spans add overhead but provide invaluable debugging information
  • Consider your throughput requirements when choosing observability modes