Integration Patterns
This guide covers common patterns and best practices for integrating the digitalNXT Agency framework into your applications, from simple single-agent scenarios to complex multi-agency orchestration.
Common Integration Patterns
๐ฏ Pattern 1: Simple Agent Wrapper
Wrapping a single agent with Agency framework benefits:
from agency import Agency
from agency.strategies.base import Strategy
from agents import Agent
class SimpleAgentStrategy(Strategy):
"""Strategy that wraps a single agent with Agency benefits."""
def _setup_agents(self):
self.agent = self.overrides.get('agent') or Agent(
name="Assistant Agent",
instructions="You are a helpful AI assistant."
)
async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
self._add_to_execution_path("single_agent_execution")
if self.processor:
result = await self.processor.run(self.agent, input_data, context)
return result.final_output
else:
# Fallback
from agents import Runner
result = await Runner.run(self.agent, input_data, context=context)
return result.final_output
# Usage
async def simple_agent_example():
strategy = SimpleAgentStrategy()
agency = Agency(
name="Simple Assistant",
strategy=strategy,
session_id="simple_session_123"
)
result = await agency.run("What is the capital of France?")
print(f"Response: {result.output}")
print(f"Execution time: {result.execution_time:.2f}s")
Benefits: - โ Automatic observability and tracing - โ Consistent error handling and logging - โ Session and user tracking - โ Performance monitoring - โ Easy upgrade path to more complex patterns
๐ฆ Pattern 2: Request Router
Route different types of requests to specialized agents:
from agency import Agency
from agency.strategies.router import RouterStrategy
async def request_router_example():
"""Route requests to appropriate specialists."""
# Use built-in RouterStrategy
strategy = RouterStrategy()
agency = Agency(
name="Customer Support Router",
strategy=strategy,
context={
"company": "ACME Corp",
"support_level": "tier_1"
}
)
# Different request types get routed automatically
requests = [
"I have a question about my monthly bill",
"The app keeps crashing on my phone",
"How do I reset my password?",
"I want to cancel my subscription"
]
for request in requests:
result = await agency.run(request)
final_agent = result.extra_data.get('final_agent', 'Unknown')
print(f"Request: {request[:30]}...")
print(f"Handled by: {final_agent}")
print(f"Response: {result.output[:100]}...")
print(f"Path: {' โ '.join(result.execution_path)}")
print("-" * 50)
Use Cases: - Customer support systems - Help desk automation - Multi-domain chatbots - Content classification and routing
๐ Pattern 3: Multi-Step Processing Pipeline
Chain multiple agents for complex processing:
class ProcessingPipelineStrategy(Strategy):
"""Multi-step processing with different specialized agents."""
def _setup_agents(self):
self.extractor = self.overrides.get('extractor') or Agent(
name="Data Extractor",
instructions="Extract key information and entities from the input text."
)
self.analyzer = self.overrides.get('analyzer') or Agent(
name="Content Analyzer",
instructions="Analyze the extracted information and provide insights."
)
self.summarizer = self.overrides.get('summarizer') or Agent(
name="Content Summarizer",
instructions="Create a concise summary of the analysis."
)
async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
"""Execute multi-step processing pipeline."""
# Step 1: Extract information
self._add_to_execution_path("information_extraction")
extraction_result = await self._run_agent(
self.extractor,
input_data,
context
)
# Step 2: Analyze extracted information
self._add_to_execution_path("content_analysis")
analysis_prompt = f"Analyze this extracted information:\n{extraction_result}"
analysis_result = await self._run_agent(
self.analyzer,
analysis_prompt,
context
)
# Step 3: Summarize analysis
self._add_to_execution_path("content_summarization")
summary_prompt = f"Summarize this analysis:\n{analysis_result}"
final_result = await self._run_agent(
self.summarizer,
summary_prompt,
context
)
# Store pipeline metadata
self.extra_data = {
"pipeline_stages": 3,
"extraction_length": len(extraction_result),
"analysis_length": len(analysis_result),
"summary_length": len(final_result)
}
return final_result
async def _run_agent(self, agent, input_data: str, context: dict) -> str:
"""Helper method to run agents consistently."""
if self.processor:
result = await self.processor.run(agent, input_data, context)
else:
from agents import Runner
result = await Runner.run(agent, input_data, context=context)
return result.final_output
async def pipeline_example():
"""Process complex content through a multi-step pipeline."""
strategy = ProcessingPipelineStrategy()
agency = Agency(
name="Content Processing Pipeline",
strategy=strategy,
context={"domain": "business_analysis"}
)
complex_input = """
Q3 2024 Sales Report: Our revenue increased by 23% compared to Q2,
reaching $4.2M. The growth was primarily driven by our new product
line which accounted for 45% of sales. Customer satisfaction scores
improved to 4.6/5. However, we faced supply chain challenges that
increased costs by 8%. Our main competitor launched a similar product
but at a higher price point.
"""
result = await agency.run(complex_input)
print("Pipeline Result:")
print(f"Final Output: {result.output}")
print(f"Execution Path: {' โ '.join(result.execution_path)}")
print(f"Pipeline Stages: {result.extra_data['pipeline_stages']}")
print(f"Processing Time: {result.execution_time:.2f}s")
โก Pattern 4: Parallel Processing
Execute multiple agents concurrently for different perspectives:
import asyncio
from typing import List
class ParallelAnalysisStrategy(Strategy):
"""Run multiple agents in parallel and combine results."""
def _setup_agents(self):
self.financial_analyst = Agent(
name="Financial Analyst",
instructions="Analyze the financial aspects and implications."
)
self.risk_analyst = Agent(
name="Risk Analyst",
instructions="Identify potential risks and mitigation strategies."
)
self.market_analyst = Agent(
name="Market Analyst",
instructions="Analyze market trends and competitive landscape."
)
self.synthesizer = Agent(
name="Analysis Synthesizer",
instructions="Synthesize multiple analyses into a coherent summary."
)
async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
"""Execute parallel analysis and synthesis."""
# Run analysts in parallel
self._add_to_execution_path("parallel_analysis")
analysis_tasks = [
self._run_agent(self.financial_analyst, input_data, context, "financial"),
self._run_agent(self.risk_analyst, input_data, context, "risk"),
self._run_agent(self.market_analyst, input_data, context, "market")
]
# Wait for all analyses to complete
analyses = await asyncio.gather(*analysis_tasks)
# Combine all analyses
self._add_to_execution_path("analysis_synthesis")
combined_analysis = "\n\n".join([
f"Financial Analysis: {analyses[0]}",
f"Risk Analysis: {analyses[1]}",
f"Market Analysis: {analyses[2]}"
])
# Synthesize final result
synthesis_prompt = f"Synthesize these analyses into a comprehensive summary:\n{combined_analysis}"
final_result = await self._run_agent(
self.synthesizer,
synthesis_prompt,
context,
"synthesis"
)
# Store parallel execution metadata
self.extra_data = {
"parallel_agents": 3,
"analysis_types": ["financial", "risk", "market"],
"total_analysis_length": sum(len(a) for a in analyses),
"synthesis_length": len(final_result)
}
return final_result
async def _run_agent(self, agent, input_data: str, context: dict, analysis_type: str) -> str:
"""Run agent with analysis type tracking."""
if self.processor:
result = await self.processor.run(agent, input_data, context)
else:
from agents import Runner
result = await Runner.run(agent, input_data, context=context)
# Track which analysis completed
self._add_to_execution_path(f"{analysis_type}_analysis_completed")
return result.final_output
async def parallel_processing_example():
"""Analyze business scenario from multiple perspectives simultaneously."""
strategy = ParallelAnalysisStrategy()
agency = Agency(
name="Multi-Perspective Business Analyzer",
strategy=strategy
)
business_scenario = """
TechCorp is considering acquiring StartupX for $50M. StartupX has
revolutionary AI technology but limited revenue ($2M annually).
The acquisition would give TechCorp access to 15 key patents and
a team of 25 AI researchers. However, StartupX has $8M in debt
and faces a lawsuit over IP infringement.
"""
result = await agency.run(business_scenario)
print("Parallel Analysis Result:")
print(f"Comprehensive Analysis: {result.output}")
print(f"Execution Path: {result.execution_path}")
print(f"Parallel Agents Used: {result.extra_data['parallel_agents']}")
print(f"Analysis Types: {result.extra_data['analysis_types']}")
print(f"Total Execution Time: {result.execution_time:.2f}s")
๐พ Pattern 5: Memory-Enabled Conversations
Integrate persistent memory for stateful conversations:
from agency.factories.agency_factory import AgencyFactory
from agency.sessions.session_factory import SessionFactory
from agency.conversations.manager import ConversationManager
class ConversationalAgencyPattern:
"""Pattern for creating conversational agencies with persistent memory."""
def __init__(self, base_dir: str = "data"):
self.conversation_manager = ConversationManager(
base_dir=f"{base_dir}/conversations",
session_dir=f"{base_dir}/sessions"
)
self.active_agencies = {}
async def get_or_create_agency(self, user_id: str, conversation_name: str = None):
"""Get existing agency or create new one with memory."""
# Create or load conversation
conversation = self.conversation_manager.create_conversation(
name=conversation_name or f"Chat with {user_id}",
config=AgencyConfig(
name="ConversationalSupport",
strategy_type="router"
)
)
# Create session for memory
session = SessionFactory.create_file_session(
session_id=f"user_{user_id}",
db_path=f"data/sessions/user_{user_id}.db"
)
# Create agency with memory
agency = AgencyFactory.create_agency(
config=conversation.config,
session=session,
session_id=conversation.session_info.session_id,
user_id=user_id
)
# Cache agency for reuse
self.active_agencies[user_id] = {
"agency": agency,
"conversation": conversation,
"session": session
}
return agency, conversation
async def chat(self, user_id: str, message: str, context: dict = None):
"""Send message with full conversation context."""
# Get or create agency
agency, conversation = await self.get_or_create_agency(user_id)
# Add user message to conversation
self.conversation_manager.add_message(
conversation.id,
"user",
message,
metadata={"user_id": user_id}
)
# Process with full context
result = await agency.run(
message,
run_context={
"conversation_id": conversation.id,
"user_id": user_id,
"message_count": len(conversation.messages) + 1,
**(context or {})
}
)
# Add assistant response to conversation
self.conversation_manager.add_message(
conversation.id,
"assistant",
result.output,
metadata={
"execution_time": result.execution_time,
"execution_path": result.execution_path
}
)
return result
async def get_conversation_history(self, user_id: str, limit: int = 10):
"""Get recent conversation history."""
if user_id in self.active_agencies:
conversation = self.active_agencies[user_id]["conversation"]
messages = self.conversation_manager.get_messages(conversation.id, limit=limit)
return messages
return []
def cleanup_inactive_sessions(self, inactive_hours: int = 24):
"""Clean up old sessions."""
# Implementation would check last activity and clean up
pass
async def conversational_example():
"""Example of persistent conversational agency."""
chat_system = ConversationalAgencyPattern()
user_id = "user_123"
# First conversation
print("=== First Conversation ===")
result1 = await chat_system.chat(
user_id=user_id,
message="Hi, I'm planning a trip to Japan. Can you help me?",
context={"priority": "high"}
)
print(f"Assistant: {result1.output}")
# Second message - should remember context
print("\n=== Follow-up Message ===")
result2 = await chat_system.chat(
user_id=user_id,
message="What about the weather in Tokyo in April?",
context={"priority": "medium"}
)
print(f"Assistant: {result2.output}")
# Third message - continuing conversation
print("\n=== Another Follow-up ===")
result3 = await chat_system.chat(
user_id=user_id,
message="Thanks! Can you recommend some hotels too?",
context={"priority": "low"}
)
print(f"Assistant: {result3.output}")
# Show conversation history
print("\n=== Conversation History ===")
history = await chat_system.get_conversation_history(user_id)
for i, msg in enumerate(history):
print(f"{i+1}. [{msg.role}]: {msg.content[:100]}...")
print(f"\nTotal messages in conversation: {len(history)}")
Benefits: - โ Persistent conversation memory across sessions - โ User-specific context and history - โ Seamless conversation continuity - โ Full observability of conversation flows - โ Easy integration with web applications
๐ข Pattern 6: Multi-Agency Orchestration
Coordinate multiple agencies for complex workflows:
class MultiAgencyOrchestrator:
"""Orchestrate multiple agencies for complex business processes."""
def __init__(self):
# Setup different agencies for different domains
self.research_agency = Agency(
name="Research Agency",
strategy=ProcessingPipelineStrategy(),
context={"domain": "research"}
)
self.analysis_agency = Agency(
name="Analysis Agency",
strategy=ParallelAnalysisStrategy(),
context={"domain": "analysis"}
)
self.decision_agency = Agency(
name="Decision Support Agency",
strategy=SimpleAgentStrategy(
agent=Agent(
name="Decision Support Agent",
instructions="Provide clear decision recommendations based on research and analysis."
)
),
context={"domain": "decision_support"}
)
async def process_business_decision(self, scenario: str, context: dict = None):
"""Process a business decision through multiple agencies."""
print("๐ Starting multi-agency processing...")
# Stage 1: Research
print("๐ Research phase...")
research_result = await self.research_agency.run(scenario, context)
# Stage 2: Analysis
print("๐ Analysis phase...")
analysis_input = f"Scenario: {scenario}\n\nResearch: {research_result.output}"
analysis_result = await self.analysis_agency.run(analysis_input, context)
# Stage 3: Decision Support
print("๐ฏ Decision support phase...")
decision_input = f"Scenario: {scenario}\n\nResearch: {research_result.output}\n\nAnalysis: {analysis_result.output}"
decision_result = await self.decision_agency.run(decision_input, context)
# Combine results
return {
"scenario": scenario,
"research": research_result,
"analysis": analysis_result,
"decision": decision_result,
"total_time": research_result.execution_time + analysis_result.execution_time + decision_result.execution_time,
"agencies_used": ["research", "analysis", "decision_support"]
}
async def multi_agency_example():
"""Complex business decision processing across multiple agencies."""
orchestrator = MultiAgencyOrchestrator()
scenario = "Should we expand our SaaS business to the European market?"
result = await orchestrator.process_business_decision(
scenario=scenario,
context={"company": "TechSaaS Inc", "budget": "$2M", "timeline": "6 months"}
)
print("๐ข Multi-Agency Processing Complete!")
print(f"Scenario: {result['scenario']}")
print(f"Total Processing Time: {result['total_time']:.2f}s")
print(f"Agencies Used: {result['agencies_used']}")
print("\n๐ Research Output:")
print(result['research'].output[:200] + "...")
print("\n๐ Analysis Output:")
print(result['analysis'].output[:200] + "...")
print("\n๐ฏ Decision Recommendation:")
print(result['decision'].output)
Web Application Integration
๐ FastAPI Integration
Integrate Agency framework with FastAPI for web applications:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid
app = FastAPI(title="Agency API")
# Request/Response models
class AgencyRequest(BaseModel):
input_data: str
context: Optional[dict] = None
session_id: Optional[str] = None
user_id: Optional[str] = None
class AgencyResponse(BaseModel):
request_id: str
output: str
execution_time: float
execution_path: list[str]
metadata: dict
# Global agency instances
customer_support_agency = Agency(
name="API Customer Support",
strategy=RouterStrategy(),
context={"service": "api", "version": "1.0"}
)
@app.post("/api/v1/process", response_model=AgencyResponse)
async def process_request(request: AgencyRequest):
"""Process request through Agency framework."""
request_id = str(uuid.uuid4())
try:
# Update agency context with request metadata
run_context = {
**(request.context or {}),
"request_id": request_id,
"api_version": "v1"
}
# Execute through agency
result = await customer_support_agency.run(
input_data=request.input_data,
run_context=run_context
)
return AgencyResponse(
request_id=request_id,
output=result.output,
execution_time=result.execution_time,
execution_path=result.execution_path,
metadata=result.metadata
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
@app.post("/api/v1/process-async")
async def process_request_async(request: AgencyRequest, background_tasks: BackgroundTasks):
"""Process request asynchronously."""
request_id = str(uuid.uuid4())
async def background_process():
"""Background processing task."""
result = await customer_support_agency.run(request.input_data, request.context)
# Store result in database, send notification, etc.
print(f"Background task {request_id} completed: {result.execution_time:.2f}s")
background_tasks.add_task(background_process)
return {"request_id": request_id, "status": "processing"}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
processor_info = customer_support_agency.processor.get_service_info()
return {
"status": "healthy",
"agency": "API Customer Support",
"observability": processor_info["observability_type"],
"langfuse_mode": processor_info["langfuse_mode"]
}
๐ Streaming Responses
Implement streaming responses for real-time user experience:
from fastapi.responses import StreamingResponse
import json
@app.post("/api/v1/stream")
async def stream_process(request: AgencyRequest):
"""Stream Agency processing results."""
async def generate_stream():
"""Generate streaming response."""
# Send initial status
yield f"data: {json.dumps({'type': 'status', 'message': 'Processing started'})}\n\n"
try:
# Execute agency (in real implementation, you'd need streaming-capable agents)
result = await customer_support_agency.run(request.input_data, request.context)
# Send execution path updates
for step in result.execution_path:
yield f"data: {json.dumps({'type': 'step', 'step': step})}\n\n"
# Send final result
yield f"data: {json.dumps({'type': 'result', 'output': result.output, 'execution_time': result.execution_time})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
Environment-Specific Configurations
๐ Development Environment
def create_development_agency():
"""Agency configuration for development."""
return Agency(
name="Development Agency",
strategy=RouterStrategy(),
context={"environment": "development", "debug": True},
langfuse_mode=LangfuseMode.LOCAL, # Use local Langfuse
auto_flush=True, # Immediate feedback
session_id="dev_session"
)
๐งช Testing Environment
def create_test_agency():
"""Agency configuration for testing."""
return Agency(
name="Test Agency",
strategy=RouterStrategy(),
context={"environment": "test"},
langfuse_mode=LangfuseMode.DISABLED, # No external dependencies
auto_flush=False # Faster test execution
)
๐ Production Environment
def create_production_agency():
"""Agency configuration for production."""
return Agency(
name="Production Agency",
strategy=RouterStrategy(),
context={
"environment": "production",
"version": os.getenv("APP_VERSION", "1.0.0"),
"deployment": os.getenv("DEPLOYMENT_ID")
},
langfuse_mode=LangfuseMode.REMOTE, # Remote Langfuse
auto_flush=False, # Better performance
session_id=None, # Set per request
user_id=None # Set per request
)
Best Practices Summary
โ Do
- Reuse Agency instances across multiple requests for better performance
- Use appropriate observability modes for each environment
- Provide meaningful context at both agency and run levels
- Implement proper error handling and graceful degradation
- Monitor execution times and optimize slow strategies
- Use descriptive names for agencies, strategies, and execution steps
โ Avoid
- Creating new agencies per request (performance impact)
- Ignoring execution context (loses valuable debugging information)
- Silent error handling (makes debugging difficult)
- Hardcoding configuration (reduces flexibility)
- Excessive auto-flushing in high-throughput scenarios
๐ฏ Performance Tips
- Use
LangfuseMode.DISABLEDfor high-throughput scenarios - Disable
auto_flushfor batch processing - Implement connection pooling for external services
- Cache frequently used configurations
- Monitor memory usage with long-lived agency instances
Pattern Selection Guide
Simple Agent Wrapper: Single agent with Agency benefits - Use for: Basic chatbots, simple Q&A, single-step processing
Request Router: Route to specialized agents - Use for: Customer support, multi-domain applications, content classification
Processing Pipeline: Sequential multi-step processing - Use for: Document processing, data analysis, content transformation
Parallel Processing: Multiple agents working simultaneously - Use for: Multi-perspective analysis, consensus building, performance optimization
Multi-Agency Orchestration: Complex business workflows - Use for: Decision support systems, complex business processes, enterprise workflows
Common Pitfalls
- Over-engineering: Start simple and add complexity as needed
- Context bloat: Keep context objects focused and relevant
- Synchronous thinking: Leverage async capabilities for better performance
- Ignoring observability: Use the built-in tracing and monitoring features
- Testing shortcuts: Test with realistic scenarios, not just happy paths