Agency Framework
The Agency class is the central orchestrator of the digitalNXT Agency framework. It provides a high-level interface for managing AI agent workflows with integrated observability, error handling, and context management.
Core Concepts
🏛️ Agency as Orchestrator
The Agency acts as a central coordinator that: - Manages strategy execution with shared context - Provides consistent observability across all operations - Handles session and user metadata globally - Integrates memory management through SQLiteSession - Ensures proper error propagation and logging
📋 Strategy Integration
Agencies work with strategies to define execution patterns: - Router Strategy: Route requests to specialist agents - Sequential Strategy: Execute agents in a specific order - Parallel Strategy: Execute multiple agents concurrently - Custom Strategies: User-defined execution logic
Architecture Overview
graph TD
Agency[🏛️ Agency] --> Strategy[📋 Strategy]
Agency --> Processor[⚙️ Processor]
Agency --> Logger[📝 Centralized Logger]
Agency --> Session[💾 SQLiteSession]
Strategy --> |implements| BaseStrategy[📝 Base Strategy]
Strategy --> Agent1[🤖 Billing Agent]
Strategy --> Agent2[🤖 Technical Agent]
Strategy --> Agent3[🤖 Triage Agent]
Processor --> AsyncExec[🔄 Async Executor]
Processor --> SyncExec[⏱️ Sync Executor]
Processor --> LangfuseClient[📈 Langfuse Client]
Session --> |memory| InMemory[🧠 In-Memory]
Session --> |persistent| FileDB[💾 File Database]
Agency --> |returns| StrategyResult[📊 Strategy Result]
subgraph "Context & Metadata"
GlobalContext[🌍 Global Context]
SessionData[👤 Session Data]
UserData[👤 User Data]
RunContext[🔄 Run Context]
Memory[💭 Conversation Memory]
end
Agency --> GlobalContext
Agency --> SessionData
Agency --> UserData
Session --> Memory
classDef agencyClass fill:#f3e5f5
classDef strategyClass fill:#e8f5e8
classDef processorClass fill:#fff3e0
classDef contextClass fill:#e3f2fd
class Agency,StrategyResult agencyClass
class Strategy,BaseStrategy,Agent1,Agent2,Agent3 strategyClass
class Processor,AsyncExec,SyncExec,LangfuseClient processorClass
class GlobalContext,SessionData,UserData,RunContext contextClass
Agency Class Structure
🔧 Initialization
from agency import Agency
from agency.strategies.router import RouterStrategy
from agency.processors.processor import LangfuseMode
from agency.sessions.session_factory import SessionFactory
# Create session for memory
session = SessionFactory.create_file_session(
session_id="user_456",
db_path="data/sessions/user_456.db"
)
# Basic setup with memory
strategy = RouterStrategy()
agency = Agency(
name="Customer Support Agency",
strategy=strategy,
context={"org_id": "123", "environment": "production"},
metadata={"version": "1.0", "deployment": "us-west-2"},
langfuse_mode=LangfuseMode.AUTO,
session_id="session_789",
user_id="user_456",
auto_flush=True,
session=session # Add session for memory
)
📊 Core Methods
async run() - Execute Strategy
result = await agency.run(
input_data="I need help with my billing",
run_context={"priority": "high", "channel": "web"},
tags=["billing", "customer-support"] # Optional, maintained for compatibility
)
get_service_info() - System Information
info = agency.processor.get_service_info()
# Returns: service configuration, observability type, session details
Execution Flow
sequenceDiagram
participant User
participant Agency
participant Strategy
participant Processor
participant Agents
participant Observability
User->>Agency: run(input_data, run_context)
Agency->>Agency: Setup centralized logging
Agency->>Agency: Merge contexts (global + run + session/user)
Agency->>Agency: Start execution timer
Agency->>Strategy: execute(input_data, merged_context)
Strategy->>Strategy: Initialize execution path
Strategy->>Processor: run(agent, input_data, context)
Processor->>Observability: Create Langfuse span
Processor->>Agents: Execute via OpenAI Runner
Agents-->>Processor: Agent result
Processor->>Observability: Update span with I/O
Processor-->>Strategy: Execution result
Strategy->>Strategy: Process & track execution path
Strategy-->>Agency: StrategyResult
Agency->>Agency: Add processor metadata
Agency->>Agency: Calculate total execution time
Agency->>Agency: Log completion status
Agency-->>User: Final StrategyResult
Context Management
The Agency provides sophisticated context management that merges multiple sources:
🌍 Context Hierarchy
# Context merging (higher priority overrides lower)
final_context = {
**agency.context, # Global agency context
**run_context, # Per-run context
"agency_name": agency.name, # System context
"session_id": agency.session_id,
"user_id": agency.user_id,
}
📋 Context Types
| Context Type | Purpose | Scope | Example |
|---|---|---|---|
| Global Context | Organization-wide settings | Agency lifetime | {"org_id": "123", "env": "prod"} |
| Session Context | User session data | Session lifetime | {"session_id": "sess_789"} |
| User Context | User-specific data | User lifetime | {"user_id": "user_456", "preferences": {...}} |
| Run Context | Request-specific data | Single execution | {"priority": "high", "channel": "web"} |
Observability Features
📝 Centralized Logging
# Automatic log formatting with timestamps
# 2024-01-15 14:30:25 - INFO - 🏛️ Agency 'Customer Support' initialized with strategy: RouterStrategy
# 2024-01-15 14:30:25 - INFO - 📊 Observability: langfuse_local
# 2024-01-15 14:30:26 - INFO - 🚀 Agency 'Customer Support' starting run with input: I need help with...
# 2024-01-15 14:30:28 - INFO - ✅ Agency 'Customer Support' completed successfully in 2.34s
📊 Integrated Tracing
- Langfuse Integration: Automatic trace creation and span management
- Performance Tracking: Execution time monitoring at agency and strategy levels
- Error Correlation: Errors are automatically linked to traces for debugging
🎯 Metadata Enrichment
# Automatic metadata added to results
result.extra_data["processor"] = {
"service_name": "Customer Support Agency",
"observability_type": "langfuse_local",
"session_id": "sess_789",
"user_id": "user_456"
}
Configuration Options
⚙️ Processor Configuration
from agency.processors.processor import LangfuseMode
agency = Agency(
name="My Agency",
strategy=my_strategy,
langfuse_mode=LangfuseMode.AUTO, # AUTO, LOCAL, REMOTE, DISABLED
auto_flush=True, # Flush traces immediately
session_id="session_123", # Optional session tracking
user_id="user_456" # Optional user tracking
)
🔧 Strategy Integration
# Strategies automatically receive the processor
class CustomStrategy(Strategy):
def _setup_agents(self):
self.my_agent = Agent(name="Custom Agent")
async def _execute_strategy(self, input_data: str, context: dict):
# Processor is automatically available as self.processor
# Logger is automatically available as self.logger
self.logger.info("Executing custom strategy")
return f"Processed: {input_data}"
# Agency automatically calls strategy.set_processor()
agency = Agency(name="Custom Agency", strategy=CustomStrategy())
Error Handling
🛡️ Comprehensive Error Management
try:
result = await agency.run("problematic input")
except Exception as e:
# Agency automatically logs:
# ❌ Agency 'My Agency' failed: <error details>
# ⏱️ Failed after 1.23s
# 📍 Execution path: ['step1', 'step2']
print(f"Agency execution failed: {e}")
📊 Error Context
- Execution Path Tracking: See exactly where failures occurred
- Timing Information: Understand performance impact of failures
- Context Preservation: Full context available for debugging
Memory Management
💾 Session Integration
The Agency framework integrates seamlessly with OpenAI's SQLiteSession for persistent conversation memory:
from agency.sessions.session_factory import SessionFactory
from agency.factories.agency_factory import AgencyFactory
# Create persistent session
session = SessionFactory.create_file_session(
session_id="user_123",
db_path="data/sessions/user_123.db"
)
# Create agency with memory
agency = AgencyFactory.create_agency(
config=agency_config,
session=session,
user_id="user_123"
)
# Conversations are automatically persisted
result = await agency.run("Hello, remember our previous conversation?")
🧠 Memory Types
| Memory Type | Use Case | Persistence | Performance |
|---|---|---|---|
| In-Memory | Testing, temporary sessions | No | Fastest |
| File-based | Production, long conversations | Yes | Good |
💭 Conversation Management
from agency.conversations.manager import ConversationManager
# High-level conversation API
manager = ConversationManager()
conversation = manager.create_conversation(
name="Customer Support",
config=agency_config
)
# Agency automatically uses conversation context
agency = AgencyFactory.create_agency_from_conversation(conversation)
For detailed memory management documentation, see Memory Management.
Best Practices
✅ Do
# Use descriptive agency names
agency = Agency(name="Customer Support Bot", strategy=strategy)
# Provide meaningful context
context = {
"org_id": "acme-corp",
"environment": "production",
"service_tier": "premium"
}
# Use structured run context
run_context = {
"request_id": "req_123",
"priority": "high",
"channel": "web"
}
# Handle errors appropriately
try:
result = await agency.run(input_data, run_context)
logger.info(f"Success: {result.output}")
except Exception as e:
logger.error(f"Agency execution failed: {e}")
# Implement fallback behavior
❌ Avoid
# Don't use generic names
agency = Agency(name="Agent", strategy=strategy) # Too generic
# Don't ignore context
result = await agency.run(input_data) # Missing valuable context
# Don't catch and ignore errors
try:
result = await agency.run(input_data)
except:
pass # Swallowing errors loses valuable debugging info
Example: Complete Agency Setup
import asyncio
from agency import Agency
from agency.strategies.router import RouterStrategy
from agency.processors.processor import LangfuseMode
async def main():
# Initialize strategy
strategy = RouterStrategy()
# Create agency with full configuration
agency = Agency(
name="Enterprise Customer Support",
strategy=strategy,
context={
"organization": "ACME Corp",
"environment": "production",
"service_tier": "enterprise",
"region": "us-west-2"
},
metadata={
"version": "2.1.0",
"deployment_id": "deploy_789",
"build_number": "1234"
},
langfuse_mode=LangfuseMode.AUTO,
session_id="sess_customer_123",
user_id="user_premium_456",
auto_flush=True
)
# Execute with rich context
result = await agency.run(
input_data="I'm having trouble accessing my premium features",
run_context={
"request_id": "req_urgent_789",
"priority": "high",
"channel": "web_portal",
"user_tier": "premium",
"previous_interactions": 3
}
)
# Process result
print(f"Response: {result.output}")
print(f"Execution time: {result.execution_time:.2f}s")
print(f"Path taken: {' → '.join(result.execution_path)}")
print(f"Final agent: {result.extra_data.get('final_agent', 'Unknown')}")
if __name__ == "__main__":
asyncio.run(main())
Related Topics
- Memory Management: Overview of session and conversation memory
- Conversations: High-level conversation management API
- Sessions: SQLite session factory patterns
- Factories: Agency factory patterns and configuration
- Strategies: Understanding strategy patterns and implementation
- Processors: Deep dive into execution engines
- Integration Patterns: Common usage patterns
Agency Lifecycle
Each Agency instance is designed to be long-lived and can handle multiple run() calls. The processor and strategy are initialized once and reused across executions for optimal performance.
Performance Optimization
- Reuse Agency instances across multiple runs
- Use appropriate Langfuse modes for your environment
- Consider disabling auto_flush for high-throughput scenarios
- Leverage context caching for repeated similar requests