Skip to content

Agency Framework

The Agency class is the central orchestrator of the digitalNXT Agency framework. It provides a high-level interface for managing AI agent workflows with integrated observability, error handling, and context management.

Core Concepts

🏛️ Agency as Orchestrator

The Agency acts as a central coordinator that: - Manages strategy execution with shared context - Provides consistent observability across all operations - Handles session and user metadata globally - Integrates memory management through SQLiteSession - Ensures proper error propagation and logging

📋 Strategy Integration

Agencies work with strategies to define execution patterns: - Router Strategy: Route requests to specialist agents - Sequential Strategy: Execute agents in a specific order - Parallel Strategy: Execute multiple agents concurrently - Custom Strategies: User-defined execution logic

Architecture Overview

graph TD
    Agency[🏛️ Agency] --> Strategy[📋 Strategy]
    Agency --> Processor[⚙️ Processor]
    Agency --> Logger[📝 Centralized Logger]
    Agency --> Session[💾 SQLiteSession]

    Strategy --> |implements| BaseStrategy[📝 Base Strategy]
    Strategy --> Agent1[🤖 Billing Agent]
    Strategy --> Agent2[🤖 Technical Agent]
    Strategy --> Agent3[🤖 Triage Agent]

    Processor --> AsyncExec[🔄 Async Executor]
    Processor --> SyncExec[⏱️ Sync Executor]
    Processor --> LangfuseClient[📈 Langfuse Client]

    Session --> |memory| InMemory[🧠 In-Memory]
    Session --> |persistent| FileDB[💾 File Database]

    Agency --> |returns| StrategyResult[📊 Strategy Result]

    subgraph "Context & Metadata"
        GlobalContext[🌍 Global Context]
        SessionData[👤 Session Data]
        UserData[👤 User Data]
        RunContext[🔄 Run Context]
        Memory[💭 Conversation Memory]
    end

    Agency --> GlobalContext
    Agency --> SessionData
    Agency --> UserData
    Session --> Memory

    classDef agencyClass fill:#f3e5f5
    classDef strategyClass fill:#e8f5e8
    classDef processorClass fill:#fff3e0
    classDef contextClass fill:#e3f2fd

    class Agency,StrategyResult agencyClass
    class Strategy,BaseStrategy,Agent1,Agent2,Agent3 strategyClass
    class Processor,AsyncExec,SyncExec,LangfuseClient processorClass
    class GlobalContext,SessionData,UserData,RunContext contextClass

Agency Class Structure

🔧 Initialization

from agency import Agency
from agency.strategies.router import RouterStrategy
from agency.processors.processor import LangfuseMode
from agency.sessions.session_factory import SessionFactory

# Create session for memory
session = SessionFactory.create_file_session(
    session_id="user_456",
    db_path="data/sessions/user_456.db"
)

# Basic setup with memory
strategy = RouterStrategy()
agency = Agency(
    name="Customer Support Agency",
    strategy=strategy,
    context={"org_id": "123", "environment": "production"},
    metadata={"version": "1.0", "deployment": "us-west-2"},
    langfuse_mode=LangfuseMode.AUTO,
    session_id="session_789",
    user_id="user_456",
    auto_flush=True,
    session=session  # Add session for memory
)

📊 Core Methods

async run() - Execute Strategy

result = await agency.run(
    input_data="I need help with my billing",
    run_context={"priority": "high", "channel": "web"},
    tags=["billing", "customer-support"]  # Optional, maintained for compatibility
)

get_service_info() - System Information

info = agency.processor.get_service_info()
# Returns: service configuration, observability type, session details

Execution Flow

sequenceDiagram
    participant User
    participant Agency
    participant Strategy
    participant Processor
    participant Agents
    participant Observability

    User->>Agency: run(input_data, run_context)

    Agency->>Agency: Setup centralized logging
    Agency->>Agency: Merge contexts (global + run + session/user)
    Agency->>Agency: Start execution timer

    Agency->>Strategy: execute(input_data, merged_context)

    Strategy->>Strategy: Initialize execution path
    Strategy->>Processor: run(agent, input_data, context)

    Processor->>Observability: Create Langfuse span
    Processor->>Agents: Execute via OpenAI Runner
    Agents-->>Processor: Agent result
    Processor->>Observability: Update span with I/O
    Processor-->>Strategy: Execution result

    Strategy->>Strategy: Process & track execution path
    Strategy-->>Agency: StrategyResult

    Agency->>Agency: Add processor metadata
    Agency->>Agency: Calculate total execution time
    Agency->>Agency: Log completion status
    Agency-->>User: Final StrategyResult

Context Management

The Agency provides sophisticated context management that merges multiple sources:

🌍 Context Hierarchy

# Context merging (higher priority overrides lower)
final_context = {
    **agency.context,          # Global agency context
    **run_context,             # Per-run context
    "agency_name": agency.name, # System context
    "session_id": agency.session_id,
    "user_id": agency.user_id,
}

📋 Context Types

Context Type Purpose Scope Example
Global Context Organization-wide settings Agency lifetime {"org_id": "123", "env": "prod"}
Session Context User session data Session lifetime {"session_id": "sess_789"}
User Context User-specific data User lifetime {"user_id": "user_456", "preferences": {...}}
Run Context Request-specific data Single execution {"priority": "high", "channel": "web"}

Observability Features

📝 Centralized Logging

# Automatic log formatting with timestamps
# 2024-01-15 14:30:25 - INFO - 🏛️ Agency 'Customer Support' initialized with strategy: RouterStrategy
# 2024-01-15 14:30:25 - INFO - 📊 Observability: langfuse_local
# 2024-01-15 14:30:26 - INFO - 🚀 Agency 'Customer Support' starting run with input: I need help with...
# 2024-01-15 14:30:28 - INFO - ✅ Agency 'Customer Support' completed successfully in 2.34s

📊 Integrated Tracing

  • Langfuse Integration: Automatic trace creation and span management
  • Performance Tracking: Execution time monitoring at agency and strategy levels
  • Error Correlation: Errors are automatically linked to traces for debugging

🎯 Metadata Enrichment

# Automatic metadata added to results
result.extra_data["processor"] = {
    "service_name": "Customer Support Agency",
    "observability_type": "langfuse_local",
    "session_id": "sess_789",
    "user_id": "user_456"
}

Configuration Options

⚙️ Processor Configuration

from agency.processors.processor import LangfuseMode

agency = Agency(
    name="My Agency",
    strategy=my_strategy,
    langfuse_mode=LangfuseMode.AUTO,    # AUTO, LOCAL, REMOTE, DISABLED
    auto_flush=True,                    # Flush traces immediately
    session_id="session_123",           # Optional session tracking
    user_id="user_456"                  # Optional user tracking
)

🔧 Strategy Integration

# Strategies automatically receive the processor
class CustomStrategy(Strategy):
    def _setup_agents(self):
        self.my_agent = Agent(name="Custom Agent")

    async def _execute_strategy(self, input_data: str, context: dict):
        # Processor is automatically available as self.processor
        # Logger is automatically available as self.logger
        self.logger.info("Executing custom strategy")
        return f"Processed: {input_data}"

# Agency automatically calls strategy.set_processor()
agency = Agency(name="Custom Agency", strategy=CustomStrategy())

Error Handling

🛡️ Comprehensive Error Management

try:
    result = await agency.run("problematic input")
except Exception as e:
    # Agency automatically logs:
    # ❌ Agency 'My Agency' failed: <error details>
    # ⏱️  Failed after 1.23s
    # 📍 Execution path: ['step1', 'step2']
    print(f"Agency execution failed: {e}")

📊 Error Context

  • Execution Path Tracking: See exactly where failures occurred
  • Timing Information: Understand performance impact of failures
  • Context Preservation: Full context available for debugging

Memory Management

💾 Session Integration

The Agency framework integrates seamlessly with OpenAI's SQLiteSession for persistent conversation memory:

from agency.sessions.session_factory import SessionFactory
from agency.factories.agency_factory import AgencyFactory

# Create persistent session
session = SessionFactory.create_file_session(
    session_id="user_123",
    db_path="data/sessions/user_123.db"
)

# Create agency with memory
agency = AgencyFactory.create_agency(
    config=agency_config,
    session=session,
    user_id="user_123"
)

# Conversations are automatically persisted
result = await agency.run("Hello, remember our previous conversation?")

🧠 Memory Types

Memory Type Use Case Persistence Performance
In-Memory Testing, temporary sessions No Fastest
File-based Production, long conversations Yes Good

💭 Conversation Management

from agency.conversations.manager import ConversationManager

# High-level conversation API
manager = ConversationManager()
conversation = manager.create_conversation(
    name="Customer Support",
    config=agency_config
)

# Agency automatically uses conversation context
agency = AgencyFactory.create_agency_from_conversation(conversation)

For detailed memory management documentation, see Memory Management.

Best Practices

Do

# Use descriptive agency names
agency = Agency(name="Customer Support Bot", strategy=strategy)

# Provide meaningful context
context = {
    "org_id": "acme-corp",
    "environment": "production",
    "service_tier": "premium"
}

# Use structured run context
run_context = {
    "request_id": "req_123",
    "priority": "high",
    "channel": "web"
}

# Handle errors appropriately
try:
    result = await agency.run(input_data, run_context)
    logger.info(f"Success: {result.output}")
except Exception as e:
    logger.error(f"Agency execution failed: {e}")
    # Implement fallback behavior

Avoid

# Don't use generic names
agency = Agency(name="Agent", strategy=strategy)  # Too generic

# Don't ignore context
result = await agency.run(input_data)  # Missing valuable context

# Don't catch and ignore errors
try:
    result = await agency.run(input_data)
except:
    pass  # Swallowing errors loses valuable debugging info

Example: Complete Agency Setup

import asyncio
from agency import Agency
from agency.strategies.router import RouterStrategy
from agency.processors.processor import LangfuseMode

async def main():
    # Initialize strategy
    strategy = RouterStrategy()

    # Create agency with full configuration
    agency = Agency(
        name="Enterprise Customer Support",
        strategy=strategy,
        context={
            "organization": "ACME Corp",
            "environment": "production",
            "service_tier": "enterprise",
            "region": "us-west-2"
        },
        metadata={
            "version": "2.1.0",
            "deployment_id": "deploy_789",
            "build_number": "1234"
        },
        langfuse_mode=LangfuseMode.AUTO,
        session_id="sess_customer_123",
        user_id="user_premium_456",
        auto_flush=True
    )

    # Execute with rich context
    result = await agency.run(
        input_data="I'm having trouble accessing my premium features",
        run_context={
            "request_id": "req_urgent_789",
            "priority": "high",
            "channel": "web_portal",
            "user_tier": "premium",
            "previous_interactions": 3
        }
    )

    # Process result
    print(f"Response: {result.output}")
    print(f"Execution time: {result.execution_time:.2f}s")
    print(f"Path taken: {' → '.join(result.execution_path)}")
    print(f"Final agent: {result.extra_data.get('final_agent', 'Unknown')}")

if __name__ == "__main__":
    asyncio.run(main())

Agency Lifecycle

Each Agency instance is designed to be long-lived and can handle multiple run() calls. The processor and strategy are initialized once and reused across executions for optimal performance.

Performance Optimization

  • Reuse Agency instances across multiple runs
  • Use appropriate Langfuse modes for your environment
  • Consider disabling auto_flush for high-throughput scenarios
  • Leverage context caching for repeated similar requests