Processors
The Processor is the execution engine of the digitalNXT Agency framework, responsible for running AI agents with comprehensive observability, error handling, and performance monitoring. It bridges the gap between high-level strategy logic and low-level agent execution.
Core Concepts
⚙️ Execution Engine
The Processor acts as a sophisticated execution wrapper that: - Handles both synchronous and asynchronous agent execution - Temporal Integration: Seamlessly works within Temporal activities for durable execution - Provides comprehensive observability through Langfuse integration - Manages OpenTelemetry instrumentation automatically - Supports SQLiteSession integration for persistent memory - Implements robust error handling and recovery mechanisms
📊 Observability First
Built with observability as a core principle: - Automatic Tracing: Every agent execution gets traced - Performance Monitoring: Execution times and resource usage tracking - Error Correlation: Errors are automatically linked to execution context - Input/Output Logging: Full request/response logging for debugging
Processor Architecture
graph TD
Processor[⚙️ Processor] --> AsyncExec[🔄 Async Executor]
Processor --> SyncExec[⏱️ Sync Executor]
Processor --> LangfuseClient[📈 Langfuse Client]
Processor --> Logger[📝 Logger]
AsyncExec --> |async| OpenAIRunner[🧠 OpenAI Runner]
SyncExec --> |sync| OpenAIRunner
LangfuseClient --> |local| LocalLangfuse[🏠 Local Langfuse]
LangfuseClient --> |remote| RemoteLangfuse[🌐 Remote Langfuse]
subgraph "Observability Stack"
LocalLangfuse --> Spans[📋 Spans]
RemoteLangfuse --> Spans
Logger --> LogEntries[📝 Log Entries]
Spans --> TraceAnalysis[📊 Trace Analysis]
end
subgraph "Configuration Modes"
AutoMode[🔄 Auto Mode]
LocalMode[🏠 Local Mode]
RemoteMode[🌐 Remote Mode]
DisabledMode[🚫 Disabled Mode]
end
Processor --> AutoMode
Processor --> LocalMode
Processor --> RemoteMode
Processor --> DisabledMode
classDef processorClass fill:#fff3e0
classDef executorClass fill:#e8f5e8
classDef observabilityClass fill:#fce4ec
classDef configClass fill:#e3f2fd
class Processor processorClass
class AsyncExec,SyncExec,OpenAIRunner executorClass
class LangfuseClient,LocalLangfuse,RemoteLangfuse,Logger,Spans,LogEntries,TraceAnalysis observabilityClass
class AutoMode,LocalMode,RemoteMode,DisabledMode configClass
Processor Components
🔄 Async Executor
Handles asynchronous agent execution with full observability:
class AsyncExecutor:
async def run_with_langfuse_span(self, agent, input_data, context, **kwargs):
"""Execute agent with Langfuse span for comprehensive tracing."""
span_name = self._construct_span_name(context, agent)
with self.processor.langfuse_client.start_as_current_span(name=span_name) as span:
try:
result = await Runner.run(agent, input_data, context=context, **kwargs)
# Update span with execution details
span.update(
input=input_data,
output=result.final_output,
tags=["agent-execution", f"agent-{agent.name}"],
metadata={
"agent_name": agent.name,
"service": self.processor.service_name,
"session_id": self.processor.session_id,
"user_id": self.processor.user_id,
}
)
return result
except Exception as e:
# Comprehensive error tracking
span.update(
input=input_data,
output=f"Error: {str(e)}",
tags=["agent-execution", f"agent-{agent.name}", "error"],
metadata={
"error": str(e),
"error_type": type(e).__name__,
}
)
raise
⏱️ Sync Executor
Provides synchronous execution capabilities with identical observability features:
class SyncExecutor:
def run_with_langfuse_span(self, agent, input_data, context, **kwargs):
"""Synchronous version with same observability features."""
# Identical logic to AsyncExecutor but using Runner.run_sync()
return Runner.run_sync(agent, input_data, context=context, **kwargs)
Langfuse Integration Modes
🔄 Auto Mode (Default)
Automatically detects the best observability option:
graph TD
AutoMode[🔄 Auto Mode] --> CheckLocal{🏠 Local Available?}
CheckLocal --> |Yes| UseLocal[✅ Use Local Langfuse]
CheckLocal --> |No| CheckRemote{🌐 Remote Configured?}
CheckRemote --> |Yes| UseRemote[✅ Use Remote Langfuse]
CheckRemote --> |No| UseBasic[📝 Use Basic Logging]
UseLocal --> Success[🎯 Full Observability]
UseRemote --> Success
UseBasic --> Limited[📋 Limited Observability]
classDef autoClass fill:#e3f2fd
classDef checkClass fill:#fff3e0
classDef successClass fill:#e8f5e8
classDef limitedClass fill:#fce4ec
class AutoMode autoClass
class CheckLocal,CheckRemote checkClass
class UseLocal,UseRemote,Success successClass
class UseBasic,Limited limitedClass
🏠 Local Mode
Forces connection to local Langfuse instance (ideal for development):
# Local Langfuse defaults
LOCAL_LANGFUSE_CONFIG = {
"host": "http://localhost:3000",
"public_key": "pk-lf-1234567890abcdef",
"secret_key": "sk-lf-1234567890abcdef1234567890abcdef"
}
processor = Processor(langfuse_mode=LangfuseMode.LOCAL)
🌐 Remote Mode
Uses production Langfuse from environment variables:
# Required environment variables
LANGFUSE_PUBLIC_KEY=your_public_key
LANGFUSE_SECRET_KEY=your_secret_key
LANGFUSE_HOST=https://your-langfuse-instance.com
processor = Processor(langfuse_mode=LangfuseMode.REMOTE)
🚫 Disabled Mode
Completely disables observability for maximum performance:
Processor Configuration
🔧 Basic Configuration
from agency.processors.processor import Processor, LangfuseMode
# Minimal configuration
processor = Processor()
# Full configuration
processor = Processor(
service_name="Customer Support Service",
enable_observability=True,
langfuse_mode=LangfuseMode.AUTO,
run_config=RunConfig(),
auto_flush=True,
session_id="session_123",
user_id="user_456",
logger=custom_logger
)
📊 Advanced Configuration Options
| Parameter | Type | Default | Description |
|---|---|---|---|
service_name |
str | "Agency workflow" | Service identifier for tracing |
enable_observability |
bool | True | Enable/disable observability features |
langfuse_mode |
LangfuseMode | AUTO | Langfuse connection strategy |
run_config |
RunConfig | RunConfig() | OpenAI Runner configuration |
auto_flush |
bool | True | Automatically flush traces after execution |
session_id |
str | None | Session ID for trace grouping |
user_id |
str | None | User ID for trace attribution |
logger |
Logger | None | Custom logger instance |
Execution Flow
🔄 Async Execution Flow
sequenceDiagram
participant Strategy
participant Processor
participant AsyncExecutor
participant LangfuseClient
participant OpenAIRunner
participant Agent
Strategy->>Processor: run(agent, input_data, context)
Processor->>Processor: Log start: "🚀 Running agent 'AgentName'"
alt Langfuse Available
Processor->>AsyncExecutor: run_with_langfuse_span()
AsyncExecutor->>LangfuseClient: start_as_current_span()
LangfuseClient-->>AsyncExecutor: span context
AsyncExecutor->>OpenAIRunner: Runner.run(agent, input, context)
OpenAIRunner->>Agent: Execute agent logic
Agent-->>OpenAIRunner: Agent response
OpenAIRunner-->>AsyncExecutor: Execution result
AsyncExecutor->>LangfuseClient: span.update(input, output, metadata)
alt auto_flush enabled
AsyncExecutor->>LangfuseClient: flush()
end
AsyncExecutor-->>Processor: Final result
else No Langfuse
Processor->>AsyncExecutor: run_without_langfuse_span()
AsyncExecutor->>OpenAIRunner: Runner.run(agent, input, context)
OpenAIRunner-->>AsyncExecutor: Execution result
AsyncExecutor-->>Processor: Final result
end
Processor->>Processor: Log completion: "✅ Agent completed successfully"
Processor->>Processor: Log result preview: "📝 Result: ..."
Processor-->>Strategy: Execution result
⏱️ Sync Execution Flow
Identical to async flow but uses Runner.run_sync() instead of Runner.run().
Span Construction and Naming
🏷️ Intelligent Span Naming
The processor constructs meaningful span names based on context:
def _construct_span_name(self, context, agent):
"""Construct contextual span names for better observability."""
agency_name = context.get("agency_name") if context else None
strategy_name = context.get("strategy_name") if context else None
if agency_name and strategy_name:
return f"{agency_name} - {strategy_name}"
elif agency_name:
return f"{agency_name} - Agent"
elif strategy_name:
return f"{strategy_name} - Agent"
else:
return f"Agent - {agent.name}"
Example Span Names:
"Customer Support - RouterStrategy"(Full context)"Customer Support - Agent"(Agency only)"RouterStrategy - Agent"(Strategy only)"Agent - Billing Specialist"(Agent fallback)
📊 Span Metadata
Every span includes comprehensive metadata:
span_metadata = {
"agent_name": agent.name,
"service": self.processor.service_name,
"session_id": self.processor.session_id,
"user_id": self.processor.user_id,
# Error cases also include:
"error": str(exception),
"error_type": type(exception).__name__
}
Error Handling
🛡️ Comprehensive Error Management
The processor provides robust error handling at multiple levels:
Exception Propagation with Context
try:
result = await processor.run(agent, input_data, context)
except Exception as e:
# Processor automatically:
# 1. Logs the error with agent context
# 2. Updates Langfuse span with error details
# 3. Preserves full stack trace
# 4. Re-raises for higher-level handling
logger.error(f"Agent execution failed: {e}")
raise
Error Span Updates
# On error, spans are updated with detailed error information
span.update(
input=input_data,
output=f"Error: {str(exception)}",
tags=["agent-execution", f"agent-{agent.name}", "error"],
metadata={
"agent_name": agent.name,
"service": self.processor.service_name,
"error": str(exception),
"error_type": type(exception).__name__,
"session_id": self.processor.session_id,
"user_id": self.processor.user_id
}
)
Performance Features
⚡ Optimization Strategies
Auto-Flush Control
# Enable for real-time monitoring (slight performance cost)
processor = Processor(auto_flush=True)
# Disable for high-throughput scenarios
processor = Processor(auto_flush=False)
# Manually flush when needed
processor.langfuse_client.flush()
Connection Reuse
# Processors are designed to be long-lived
# Create once, use many times
processor = Processor(service_name="Production Service")
# Reuse across multiple executions
for input_data in batch_inputs:
result = await processor.run(agent, input_data, context)
Efficient Logging
# Output preview truncation prevents log bloat
# Long outputs are automatically truncated to 200 characters
self.logger.info(f"📝 Result: {output_preview[:200]}...")
Service Information
📋 Runtime Information
Get comprehensive processor configuration:
info = processor.get_service_info()
print(info)
# Example output:
{
"service_name": "Customer Support Service",
"observability_type": "langfuse_local", # or "langfuse_remote", "basic", "disabled"
"langfuse_mode": "auto",
"auto_flush": True,
"session_id": "session_123",
"user_id": "user_456"
}
Integration Patterns
🔌 Strategy Integration
Processors are automatically provided to strategies by the Agency:
class CustomStrategy(Strategy):
async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
# self.processor is automatically available
if self.processor:
result = await self.processor.run(
agent=self.my_agent,
input_data=input_data,
context=context
)
else:
# Fallback for testing or special cases
result = await Runner.run(self.my_agent, input_data, context=context)
return result.final_output
🏗️ Direct Usage
For advanced scenarios, use processors directly:
from agency.processors.processor import Processor, LangfuseMode
from agents import Agent
# Create processor
processor = Processor(
service_name="Direct Execution Service",
langfuse_mode=LangfuseMode.REMOTE,
session_id="direct_session",
user_id="direct_user"
)
# Create agent
agent = Agent(
name="Direct Agent",
instructions="You are a helpful assistant."
)
# Execute directly
result = await processor.run(
agent=agent,
input_data="Hello, how can you help me?",
context={"source": "direct_api", "priority": "high"}
)
print(f"Response: {result.final_output}")
Best Practices
✅ Do
# Use meaningful service names
processor = Processor(service_name="Customer Support - Billing Department")
# Provide session and user context when available
processor = Processor(
session_id=user_session.id,
user_id=user.id
)
# Handle processor availability gracefully in strategies
if self.processor:
result = await self.processor.run(agent, input_data, context)
else:
self.logger.warning("No processor available")
result = await Runner.run(agent, input_data, context=context)
# Use appropriate Langfuse modes for your environment
processor = Processor(
langfuse_mode=LangfuseMode.LOCAL # for development
# langfuse_mode=LangfuseMode.REMOTE # for production
)
❌ Avoid
# Don't create new processors for each execution
for input_data in inputs:
processor = Processor() # Inefficient!
result = await processor.run(agent, input_data)
# Don't ignore processor setup errors silently
try:
processor = Processor(langfuse_mode=LangfuseMode.REMOTE)
except Exception:
pass # This could lead to unexpected behavior
# Don't hardcode service names
processor = Processor(service_name="Service1") # Not descriptive
Troubleshooting
🔧 Common Issues
Langfuse Connection Failed
# Check if local Langfuse is running
# curl http://localhost:3000/api/public/health
# Verify environment variables for remote
import os
print("LANGFUSE_HOST:", os.getenv("LANGFUSE_HOST"))
print("LANGFUSE_PUBLIC_KEY:", os.getenv("LANGFUSE_PUBLIC_KEY"))
print("LANGFUSE_SECRET_KEY:", "***" if os.getenv("LANGFUSE_SECRET_KEY") else "Not set")
Performance Issues
# Disable auto-flush for high-throughput scenarios
processor = Processor(auto_flush=False)
# Use DISABLED mode for maximum performance
processor = Processor(langfuse_mode=LangfuseMode.DISABLED)
# Monitor execution times
start_time = time.time()
result = await processor.run(agent, input_data)
execution_time = time.time() - start_time
print(f"Execution took: {execution_time:.2f}s")
Temporal Integration
🔄 Processors in Temporal Activities
Processors seamlessly integrate with Temporal workflows, providing the same observability and error handling within durable activities:
@activity.defn
async def execute_strategy_directly(
strategy_name: str,
input_data: str,
processor_config: dict,
strategy_overrides: dict = None,
context: dict = None
) -> StrategyResult:
"""Execute strategy in Temporal activity with processor observability."""
# Processor automatically adds Temporal context
processor = Processor(
service_name=processor_config.get("service_name"),
langfuse_mode=LangfuseMode(processor_config.get("langfuse_mode")),
session_id=processor_config.get("session_id"),
user_id=processor_config.get("user_id"),
)
# Context automatically includes workflow information
if workflow.info():
context.update({
"workflow_id": workflow.info().workflow_id,
"workflow_run_id": workflow.info().run_id,
"execution_context": "temporal_activity",
"execution_mode": "temporal",
})
# Execute strategy with full observability
strategy = StrategyRegistry.get(strategy_name)(**strategy_overrides)
strategy.set_processor(processor)
result = await strategy.execute(input_data, context)
return result
🔗 Trace Correlation
Processors automatically correlate Temporal workflows with Langfuse traces:
# Temporal workflow ID flows into Langfuse spans
span_metadata = {
"workflow_id": "customer-inquiry-123",
"workflow_run_id": "abc-def-456",
"execution_mode": "temporal",
"strategy_name": "router",
"service_name": "Customer Support",
}
# Search Langfuse by workflow_id to find all related traces
🧩 Session Continuity
Sessions persist across Temporal activity executions:
# Activity 1: Initial inquiry
session = SessionFactory.create_user_session("user123", "temp_sessions")
# Activity 2: Follow-up (same session)
# Session is automatically reloaded using user_id
session = SessionFactory.create_user_session("user123", "temp_sessions")
# Conversation history is preserved
⚡ Benefits in Temporal
- Durability: Processor observability survives activity retries
- Consistency: Same observability patterns across direct and Temporal execution
- Debugging: Full trace correlation between Temporal UI and Langfuse
- Recovery: Failed activities include rich error context
- Scaling: Processors work seamlessly across multiple workers
Related Topics
- Temporal Workflows: Understanding workflow orchestration and processor integration
- Temporal Workers: Worker configuration and processor usage
- Observability: Complete observability stack with OpenTelemetry
- Agency Framework: Understanding processor integration with agencies
- Strategies: How strategies use processors for agent execution
- Integration Patterns: Common processor usage patterns
Observability Strategy
Development Environment:
- Use LangfuseMode.LOCAL with local Langfuse instance
- Enable auto_flush=True for immediate feedback
- Use descriptive service names for easy identification
Production Environment:
- Use LangfuseMode.REMOTE with hosted Langfuse
- Consider auto_flush=False for high-throughput applications
- Implement proper session and user tracking
Testing Environment:
- Use LangfuseMode.DISABLED for unit tests
- Mock processors for isolated strategy testing
- Enable detailed logging for debugging
Performance Considerations
- Processors are designed to be long-lived; avoid creating new instances frequently
- Auto-flush adds latency but provides immediate observability
- Langfuse spans add overhead but provide invaluable debugging information
- Consider your throughput requirements when choosing observability modes