Skip to content

Integration Patterns

This guide covers common patterns and best practices for integrating the digitalNXT Agency framework into your applications, from simple single-agent scenarios to complex multi-agency orchestration.

Common Integration Patterns

๐ŸŽฏ Pattern 1: Simple Agent Wrapper

Wrapping a single agent with Agency framework benefits:

from agency import Agency
from agency.strategies.base import Strategy
from agents import Agent

class SimpleAgentStrategy(Strategy):
    """Strategy that wraps a single agent with Agency benefits."""

    def _setup_agents(self):
        self.agent = self.overrides.get('agent') or Agent(
            name="Assistant Agent",
            instructions="You are a helpful AI assistant."
        )

    async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
        self._add_to_execution_path("single_agent_execution")

        if self.processor:
            result = await self.processor.run(self.agent, input_data, context)
            return result.final_output
        else:
            # Fallback
            from agents import Runner
            result = await Runner.run(self.agent, input_data, context=context)
            return result.final_output

# Usage
async def simple_agent_example():
    strategy = SimpleAgentStrategy()
    agency = Agency(
        name="Simple Assistant",
        strategy=strategy,
        session_id="simple_session_123"
    )

    result = await agency.run("What is the capital of France?")
    print(f"Response: {result.output}")
    print(f"Execution time: {result.execution_time:.2f}s")

Benefits: - โœ… Automatic observability and tracing - โœ… Consistent error handling and logging - โœ… Session and user tracking - โœ… Performance monitoring - โœ… Easy upgrade path to more complex patterns

๐Ÿšฆ Pattern 2: Request Router

Route different types of requests to specialized agents:

from agency import Agency
from agency.strategies.router import RouterStrategy

async def request_router_example():
    """Route requests to appropriate specialists."""

    # Use built-in RouterStrategy
    strategy = RouterStrategy()

    agency = Agency(
        name="Customer Support Router",
        strategy=strategy,
        context={
            "company": "ACME Corp",
            "support_level": "tier_1"
        }
    )

    # Different request types get routed automatically
    requests = [
        "I have a question about my monthly bill",
        "The app keeps crashing on my phone",
        "How do I reset my password?",
        "I want to cancel my subscription"
    ]

    for request in requests:
        result = await agency.run(request)
        final_agent = result.extra_data.get('final_agent', 'Unknown')
        print(f"Request: {request[:30]}...")
        print(f"Handled by: {final_agent}")
        print(f"Response: {result.output[:100]}...")
        print(f"Path: {' โ†’ '.join(result.execution_path)}")
        print("-" * 50)

Use Cases: - Customer support systems - Help desk automation - Multi-domain chatbots - Content classification and routing

๐Ÿ”„ Pattern 3: Multi-Step Processing Pipeline

Chain multiple agents for complex processing:

class ProcessingPipelineStrategy(Strategy):
    """Multi-step processing with different specialized agents."""

    def _setup_agents(self):
        self.extractor = self.overrides.get('extractor') or Agent(
            name="Data Extractor",
            instructions="Extract key information and entities from the input text."
        )

        self.analyzer = self.overrides.get('analyzer') or Agent(
            name="Content Analyzer",
            instructions="Analyze the extracted information and provide insights."
        )

        self.summarizer = self.overrides.get('summarizer') or Agent(
            name="Content Summarizer",
            instructions="Create a concise summary of the analysis."
        )

    async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
        """Execute multi-step processing pipeline."""

        # Step 1: Extract information
        self._add_to_execution_path("information_extraction")
        extraction_result = await self._run_agent(
            self.extractor,
            input_data,
            context
        )

        # Step 2: Analyze extracted information
        self._add_to_execution_path("content_analysis")
        analysis_prompt = f"Analyze this extracted information:\n{extraction_result}"
        analysis_result = await self._run_agent(
            self.analyzer,
            analysis_prompt,
            context
        )

        # Step 3: Summarize analysis
        self._add_to_execution_path("content_summarization")
        summary_prompt = f"Summarize this analysis:\n{analysis_result}"
        final_result = await self._run_agent(
            self.summarizer,
            summary_prompt,
            context
        )

        # Store pipeline metadata
        self.extra_data = {
            "pipeline_stages": 3,
            "extraction_length": len(extraction_result),
            "analysis_length": len(analysis_result),
            "summary_length": len(final_result)
        }

        return final_result

    async def _run_agent(self, agent, input_data: str, context: dict) -> str:
        """Helper method to run agents consistently."""
        if self.processor:
            result = await self.processor.run(agent, input_data, context)
        else:
            from agents import Runner
            result = await Runner.run(agent, input_data, context=context)
        return result.final_output

async def pipeline_example():
    """Process complex content through a multi-step pipeline."""
    strategy = ProcessingPipelineStrategy()

    agency = Agency(
        name="Content Processing Pipeline",
        strategy=strategy,
        context={"domain": "business_analysis"}
    )

    complex_input = """
    Q3 2024 Sales Report: Our revenue increased by 23% compared to Q2,
    reaching $4.2M. The growth was primarily driven by our new product
    line which accounted for 45% of sales. Customer satisfaction scores
    improved to 4.6/5. However, we faced supply chain challenges that
    increased costs by 8%. Our main competitor launched a similar product
    but at a higher price point.
    """

    result = await agency.run(complex_input)

    print("Pipeline Result:")
    print(f"Final Output: {result.output}")
    print(f"Execution Path: {' โ†’ '.join(result.execution_path)}")
    print(f"Pipeline Stages: {result.extra_data['pipeline_stages']}")
    print(f"Processing Time: {result.execution_time:.2f}s")

โšก Pattern 4: Parallel Processing

Execute multiple agents concurrently for different perspectives:

import asyncio
from typing import List

class ParallelAnalysisStrategy(Strategy):
    """Run multiple agents in parallel and combine results."""

    def _setup_agents(self):
        self.financial_analyst = Agent(
            name="Financial Analyst",
            instructions="Analyze the financial aspects and implications."
        )

        self.risk_analyst = Agent(
            name="Risk Analyst",
            instructions="Identify potential risks and mitigation strategies."
        )

        self.market_analyst = Agent(
            name="Market Analyst",
            instructions="Analyze market trends and competitive landscape."
        )

        self.synthesizer = Agent(
            name="Analysis Synthesizer",
            instructions="Synthesize multiple analyses into a coherent summary."
        )

    async def _execute_strategy(self, input_data: str, context: dict = None) -> str:
        """Execute parallel analysis and synthesis."""

        # Run analysts in parallel
        self._add_to_execution_path("parallel_analysis")

        analysis_tasks = [
            self._run_agent(self.financial_analyst, input_data, context, "financial"),
            self._run_agent(self.risk_analyst, input_data, context, "risk"),
            self._run_agent(self.market_analyst, input_data, context, "market")
        ]

        # Wait for all analyses to complete
        analyses = await asyncio.gather(*analysis_tasks)

        # Combine all analyses
        self._add_to_execution_path("analysis_synthesis")
        combined_analysis = "\n\n".join([
            f"Financial Analysis: {analyses[0]}",
            f"Risk Analysis: {analyses[1]}",
            f"Market Analysis: {analyses[2]}"
        ])

        # Synthesize final result
        synthesis_prompt = f"Synthesize these analyses into a comprehensive summary:\n{combined_analysis}"
        final_result = await self._run_agent(
            self.synthesizer,
            synthesis_prompt,
            context,
            "synthesis"
        )

        # Store parallel execution metadata
        self.extra_data = {
            "parallel_agents": 3,
            "analysis_types": ["financial", "risk", "market"],
            "total_analysis_length": sum(len(a) for a in analyses),
            "synthesis_length": len(final_result)
        }

        return final_result

    async def _run_agent(self, agent, input_data: str, context: dict, analysis_type: str) -> str:
        """Run agent with analysis type tracking."""
        if self.processor:
            result = await self.processor.run(agent, input_data, context)
        else:
            from agents import Runner
            result = await Runner.run(agent, input_data, context=context)

        # Track which analysis completed
        self._add_to_execution_path(f"{analysis_type}_analysis_completed")
        return result.final_output

async def parallel_processing_example():
    """Analyze business scenario from multiple perspectives simultaneously."""
    strategy = ParallelAnalysisStrategy()

    agency = Agency(
        name="Multi-Perspective Business Analyzer",
        strategy=strategy
    )

    business_scenario = """
    TechCorp is considering acquiring StartupX for $50M. StartupX has
    revolutionary AI technology but limited revenue ($2M annually).
    The acquisition would give TechCorp access to 15 key patents and
    a team of 25 AI researchers. However, StartupX has $8M in debt
    and faces a lawsuit over IP infringement.
    """

    result = await agency.run(business_scenario)

    print("Parallel Analysis Result:")
    print(f"Comprehensive Analysis: {result.output}")
    print(f"Execution Path: {result.execution_path}")
    print(f"Parallel Agents Used: {result.extra_data['parallel_agents']}")
    print(f"Analysis Types: {result.extra_data['analysis_types']}")
    print(f"Total Execution Time: {result.execution_time:.2f}s")

๐Ÿ’พ Pattern 5: Memory-Enabled Conversations

Integrate persistent memory for stateful conversations:

from agency.factories.agency_factory import AgencyFactory
from agency.sessions.session_factory import SessionFactory
from agency.conversations.manager import ConversationManager

class ConversationalAgencyPattern:
    """Pattern for creating conversational agencies with persistent memory."""

    def __init__(self, base_dir: str = "data"):
        self.conversation_manager = ConversationManager(
            base_dir=f"{base_dir}/conversations",
            session_dir=f"{base_dir}/sessions"
        )
        self.active_agencies = {}

    async def get_or_create_agency(self, user_id: str, conversation_name: str = None):
        """Get existing agency or create new one with memory."""

        # Create or load conversation
        conversation = self.conversation_manager.create_conversation(
            name=conversation_name or f"Chat with {user_id}",
            config=AgencyConfig(
                name="ConversationalSupport",
                strategy_type="router"
            )
        )

        # Create session for memory
        session = SessionFactory.create_file_session(
            session_id=f"user_{user_id}",
            db_path=f"data/sessions/user_{user_id}.db"
        )

        # Create agency with memory
        agency = AgencyFactory.create_agency(
            config=conversation.config,
            session=session,
            session_id=conversation.session_info.session_id,
            user_id=user_id
        )

        # Cache agency for reuse
        self.active_agencies[user_id] = {
            "agency": agency,
            "conversation": conversation,
            "session": session
        }

        return agency, conversation

    async def chat(self, user_id: str, message: str, context: dict = None):
        """Send message with full conversation context."""

        # Get or create agency
        agency, conversation = await self.get_or_create_agency(user_id)

        # Add user message to conversation
        self.conversation_manager.add_message(
            conversation.id,
            "user",
            message,
            metadata={"user_id": user_id}
        )

        # Process with full context
        result = await agency.run(
            message,
            run_context={
                "conversation_id": conversation.id,
                "user_id": user_id,
                "message_count": len(conversation.messages) + 1,
                **(context or {})
            }
        )

        # Add assistant response to conversation
        self.conversation_manager.add_message(
            conversation.id,
            "assistant",
            result.output,
            metadata={
                "execution_time": result.execution_time,
                "execution_path": result.execution_path
            }
        )

        return result

    async def get_conversation_history(self, user_id: str, limit: int = 10):
        """Get recent conversation history."""
        if user_id in self.active_agencies:
            conversation = self.active_agencies[user_id]["conversation"]
            messages = self.conversation_manager.get_messages(conversation.id, limit=limit)
            return messages
        return []

    def cleanup_inactive_sessions(self, inactive_hours: int = 24):
        """Clean up old sessions."""
        # Implementation would check last activity and clean up
        pass

async def conversational_example():
    """Example of persistent conversational agency."""

    chat_system = ConversationalAgencyPattern()
    user_id = "user_123"

    # First conversation
    print("=== First Conversation ===")
    result1 = await chat_system.chat(
        user_id=user_id,
        message="Hi, I'm planning a trip to Japan. Can you help me?",
        context={"priority": "high"}
    )
    print(f"Assistant: {result1.output}")

    # Second message - should remember context
    print("\n=== Follow-up Message ===")
    result2 = await chat_system.chat(
        user_id=user_id,
        message="What about the weather in Tokyo in April?",
        context={"priority": "medium"}
    )
    print(f"Assistant: {result2.output}")

    # Third message - continuing conversation
    print("\n=== Another Follow-up ===")
    result3 = await chat_system.chat(
        user_id=user_id,
        message="Thanks! Can you recommend some hotels too?",
        context={"priority": "low"}
    )
    print(f"Assistant: {result3.output}")

    # Show conversation history
    print("\n=== Conversation History ===")
    history = await chat_system.get_conversation_history(user_id)
    for i, msg in enumerate(history):
        print(f"{i+1}. [{msg.role}]: {msg.content[:100]}...")

    print(f"\nTotal messages in conversation: {len(history)}")

Benefits: - โœ… Persistent conversation memory across sessions - โœ… User-specific context and history - โœ… Seamless conversation continuity - โœ… Full observability of conversation flows - โœ… Easy integration with web applications

๐Ÿข Pattern 6: Multi-Agency Orchestration

Coordinate multiple agencies for complex workflows:

class MultiAgencyOrchestrator:
    """Orchestrate multiple agencies for complex business processes."""

    def __init__(self):
        # Setup different agencies for different domains
        self.research_agency = Agency(
            name="Research Agency",
            strategy=ProcessingPipelineStrategy(),
            context={"domain": "research"}
        )

        self.analysis_agency = Agency(
            name="Analysis Agency",
            strategy=ParallelAnalysisStrategy(),
            context={"domain": "analysis"}
        )

        self.decision_agency = Agency(
            name="Decision Support Agency",
            strategy=SimpleAgentStrategy(
                agent=Agent(
                    name="Decision Support Agent",
                    instructions="Provide clear decision recommendations based on research and analysis."
                )
            ),
            context={"domain": "decision_support"}
        )

    async def process_business_decision(self, scenario: str, context: dict = None):
        """Process a business decision through multiple agencies."""
        print("๐Ÿ”„ Starting multi-agency processing...")

        # Stage 1: Research
        print("๐Ÿ“š Research phase...")
        research_result = await self.research_agency.run(scenario, context)

        # Stage 2: Analysis
        print("๐Ÿ“Š Analysis phase...")
        analysis_input = f"Scenario: {scenario}\n\nResearch: {research_result.output}"
        analysis_result = await self.analysis_agency.run(analysis_input, context)

        # Stage 3: Decision Support
        print("๐ŸŽฏ Decision support phase...")
        decision_input = f"Scenario: {scenario}\n\nResearch: {research_result.output}\n\nAnalysis: {analysis_result.output}"
        decision_result = await self.decision_agency.run(decision_input, context)

        # Combine results
        return {
            "scenario": scenario,
            "research": research_result,
            "analysis": analysis_result,
            "decision": decision_result,
            "total_time": research_result.execution_time + analysis_result.execution_time + decision_result.execution_time,
            "agencies_used": ["research", "analysis", "decision_support"]
        }

async def multi_agency_example():
    """Complex business decision processing across multiple agencies."""
    orchestrator = MultiAgencyOrchestrator()

    scenario = "Should we expand our SaaS business to the European market?"

    result = await orchestrator.process_business_decision(
        scenario=scenario,
        context={"company": "TechSaaS Inc", "budget": "$2M", "timeline": "6 months"}
    )

    print("๐Ÿข Multi-Agency Processing Complete!")
    print(f"Scenario: {result['scenario']}")
    print(f"Total Processing Time: {result['total_time']:.2f}s")
    print(f"Agencies Used: {result['agencies_used']}")
    print("\n๐Ÿ“š Research Output:")
    print(result['research'].output[:200] + "...")
    print("\n๐Ÿ“Š Analysis Output:")
    print(result['analysis'].output[:200] + "...")
    print("\n๐ŸŽฏ Decision Recommendation:")
    print(result['decision'].output)

Web Application Integration

๐ŸŒ FastAPI Integration

Integrate Agency framework with FastAPI for web applications:

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid

app = FastAPI(title="Agency API")

# Request/Response models
class AgencyRequest(BaseModel):
    input_data: str
    context: Optional[dict] = None
    session_id: Optional[str] = None
    user_id: Optional[str] = None

class AgencyResponse(BaseModel):
    request_id: str
    output: str
    execution_time: float
    execution_path: list[str]
    metadata: dict

# Global agency instances
customer_support_agency = Agency(
    name="API Customer Support",
    strategy=RouterStrategy(),
    context={"service": "api", "version": "1.0"}
)

@app.post("/api/v1/process", response_model=AgencyResponse)
async def process_request(request: AgencyRequest):
    """Process request through Agency framework."""
    request_id = str(uuid.uuid4())

    try:
        # Update agency context with request metadata
        run_context = {
            **(request.context or {}),
            "request_id": request_id,
            "api_version": "v1"
        }

        # Execute through agency
        result = await customer_support_agency.run(
            input_data=request.input_data,
            run_context=run_context
        )

        return AgencyResponse(
            request_id=request_id,
            output=result.output,
            execution_time=result.execution_time,
            execution_path=result.execution_path,
            metadata=result.metadata
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")

@app.post("/api/v1/process-async")
async def process_request_async(request: AgencyRequest, background_tasks: BackgroundTasks):
    """Process request asynchronously."""
    request_id = str(uuid.uuid4())

    async def background_process():
        """Background processing task."""
        result = await customer_support_agency.run(request.input_data, request.context)
        # Store result in database, send notification, etc.
        print(f"Background task {request_id} completed: {result.execution_time:.2f}s")

    background_tasks.add_task(background_process)

    return {"request_id": request_id, "status": "processing"}

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    processor_info = customer_support_agency.processor.get_service_info()
    return {
        "status": "healthy",
        "agency": "API Customer Support",
        "observability": processor_info["observability_type"],
        "langfuse_mode": processor_info["langfuse_mode"]
    }

๐Ÿ”„ Streaming Responses

Implement streaming responses for real-time user experience:

from fastapi.responses import StreamingResponse
import json

@app.post("/api/v1/stream")
async def stream_process(request: AgencyRequest):
    """Stream Agency processing results."""

    async def generate_stream():
        """Generate streaming response."""
        # Send initial status
        yield f"data: {json.dumps({'type': 'status', 'message': 'Processing started'})}\n\n"

        try:
            # Execute agency (in real implementation, you'd need streaming-capable agents)
            result = await customer_support_agency.run(request.input_data, request.context)

            # Send execution path updates
            for step in result.execution_path:
                yield f"data: {json.dumps({'type': 'step', 'step': step})}\n\n"

            # Send final result
            yield f"data: {json.dumps({'type': 'result', 'output': result.output, 'execution_time': result.execution_time})}\n\n"
            yield f"data: {json.dumps({'type': 'done'})}\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"

    return StreamingResponse(
        generate_stream(),
        media_type="text/plain",
        headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
    )

Environment-Specific Configurations

๐Ÿ  Development Environment

def create_development_agency():
    """Agency configuration for development."""
    return Agency(
        name="Development Agency",
        strategy=RouterStrategy(),
        context={"environment": "development", "debug": True},
        langfuse_mode=LangfuseMode.LOCAL,  # Use local Langfuse
        auto_flush=True,  # Immediate feedback
        session_id="dev_session"
    )

๐Ÿงช Testing Environment

def create_test_agency():
    """Agency configuration for testing."""
    return Agency(
        name="Test Agency",
        strategy=RouterStrategy(),
        context={"environment": "test"},
        langfuse_mode=LangfuseMode.DISABLED,  # No external dependencies
        auto_flush=False  # Faster test execution
    )

๐Ÿš€ Production Environment

def create_production_agency():
    """Agency configuration for production."""
    return Agency(
        name="Production Agency",
        strategy=RouterStrategy(),
        context={
            "environment": "production",
            "version": os.getenv("APP_VERSION", "1.0.0"),
            "deployment": os.getenv("DEPLOYMENT_ID")
        },
        langfuse_mode=LangfuseMode.REMOTE,  # Remote Langfuse
        auto_flush=False,  # Better performance
        session_id=None,  # Set per request
        user_id=None  # Set per request
    )

Best Practices Summary

โœ… Do

  • Reuse Agency instances across multiple requests for better performance
  • Use appropriate observability modes for each environment
  • Provide meaningful context at both agency and run levels
  • Implement proper error handling and graceful degradation
  • Monitor execution times and optimize slow strategies
  • Use descriptive names for agencies, strategies, and execution steps

โŒ Avoid

  • Creating new agencies per request (performance impact)
  • Ignoring execution context (loses valuable debugging information)
  • Silent error handling (makes debugging difficult)
  • Hardcoding configuration (reduces flexibility)
  • Excessive auto-flushing in high-throughput scenarios

๐ŸŽฏ Performance Tips

  • Use LangfuseMode.DISABLED for high-throughput scenarios
  • Disable auto_flush for batch processing
  • Implement connection pooling for external services
  • Cache frequently used configurations
  • Monitor memory usage with long-lived agency instances

Pattern Selection Guide

Simple Agent Wrapper: Single agent with Agency benefits - Use for: Basic chatbots, simple Q&A, single-step processing

Request Router: Route to specialized agents - Use for: Customer support, multi-domain applications, content classification

Processing Pipeline: Sequential multi-step processing - Use for: Document processing, data analysis, content transformation

Parallel Processing: Multiple agents working simultaneously - Use for: Multi-perspective analysis, consensus building, performance optimization

Multi-Agency Orchestration: Complex business workflows - Use for: Decision support systems, complex business processes, enterprise workflows

Common Pitfalls

  • Over-engineering: Start simple and add complexity as needed
  • Context bloat: Keep context objects focused and relevant
  • Synchronous thinking: Leverage async capabilities for better performance
  • Ignoring observability: Use the built-in tracing and monitoring features
  • Testing shortcuts: Test with realistic scenarios, not just happy paths