Temporal Workflows
Temporal is a workflow orchestration platform that brings reliability, scalability, and visibility to the digitalNXT Agency framework. It enables durable execution of complex agent strategies with built-in fault tolerance and retry mechanisms.
What is Temporal?
Temporal is an open-source workflow orchestration platform that ensures your code runs reliably, even in the face of failures. It provides:
- Durable Execution: Workflows automatically resume from the last checkpoint after crashes
- Built-in Retries: Configurable retry policies for transient failures
- Visibility: Complete history of workflow execution
- Scalability: Horizontal scaling through worker pools
- Language Agnostic: SDKs for multiple programming languages
Why Temporal for Agent Orchestration?
Traditional agent execution faces several challenges that Temporal elegantly solves:
โ Without Temporal
- Agent failures require manual intervention
- Complex retry logic must be hand-coded
- No visibility into long-running operations
- Difficult to scale agent execution
- State management is complex
โ With Temporal
- Automatic recovery from failures
- Declarative retry policies
- Full execution history and debugging
- Horizontal scaling via workers
- State automatically persisted
Core Concepts
๐ Workflows
Workflows are the orchestration logic that coordinates activities. In Agency, we use a single GenericStrategyWorkflow that can execute any registered strategy.
@workflow.defn
class GenericStrategyWorkflow:
"""Universal workflow for executing any Agency strategy."""
@workflow.run
async def run(
self,
strategy_name: str,
input_data: str,
processor_config: dict,
strategy_overrides: dict = None,
context: dict = None,
timeout_seconds: int = 120,
) -> StrategyResult:
"""Execute strategy with full observability and fault tolerance."""
workflow.logger.info(f"๐ Starting workflow for strategy: {strategy_name}")
result = await workflow.execute_activity(
execute_strategy_directly,
args=[strategy_name, input_data, processor_config, strategy_overrides, context],
start_to_close_timeout=timedelta(seconds=timeout_seconds),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=3,
),
)
return result
โ๏ธ Activities
Activities are the actual units of work - in our case, executing agent strategies. Activities can be retried, have timeouts, and are where the real processing happens.
@activity.defn
async def execute_strategy_directly(
strategy_name: str,
input_data: str,
processor_config: dict,
strategy_overrides: dict = None,
context: dict = None
) -> StrategyResult:
"""Execute any registered strategy with full observability."""
# Get strategy from registry
strategy_class = StrategyRegistry.get(strategy_name)
strategy = strategy_class(**strategy_overrides)
# Create processor with configuration
processor = Processor(
service_name=processor_config.get("service_name"),
langfuse_mode=LangfuseMode(processor_config.get("langfuse_mode")),
session_id=processor_config.get("session_id"),
user_id=processor_config.get("user_id"),
)
# Execute strategy
strategy.set_processor(processor)
result = await strategy.execute(input_data, context)
return result
๐ท Workers
Workers are processes that execute workflows and activities. They poll task queues for work and execute it.
# Start a worker that can handle any strategy
worker_manager = TemporalWorkerManager(task_queue="agency-task-queue")
worker = await worker_manager.start_worker(
temporal_address="localhost:7233",
max_concurrent_activities=10
)
await worker.run()
๐ Task Queues
Task queues are named queues that route work to specific workers. This enables:
- Load balancing across multiple workers
- Specialized workers for different strategy types
- Versioning and gradual rollouts
Architecture Overview
graph TB
subgraph "Client Application"
App[Application Code]
end
subgraph "Temporal Server"
TServer[Temporal Server]
TaskQueue[Task Queue: agency-task-queue]
History[Workflow History]
end
subgraph "Worker Pool"
W1[Worker 1]
W2[Worker 2]
W3[Worker N...]
end
subgraph "Agency Framework"
Registry[Strategy Registry]
Strategies[Registered Strategies]
Processor[Processor with Observability]
end
App -->|Start Workflow| TServer
TServer --> TaskQueue
TaskQueue -->|Poll for Work| W1
TaskQueue -->|Poll for Work| W2
TaskQueue -->|Poll for Work| W3
W1 --> Registry
W2 --> Registry
W3 --> Registry
Registry --> Strategies
Strategies --> Processor
W1 -->|Update Status| History
W2 -->|Update Status| History
W3 -->|Update Status| History
classDef temporal fill:#e3f2fd
classDef worker fill:#e8f5e8
classDef agency fill:#fff3e0
class TServer,TaskQueue,History temporal
class W1,W2,W3 worker
class Registry,Strategies,Processor agency
Strategy Registry Pattern
The Strategy Registry eliminates code duplication by providing a single workflow and activity that can execute any strategy:
๐ Registering Strategies
from agency.temporal.core import register_strategy
from agency.strategies import RouterStrategy, SequentialStrategy, MapReduceStrategy
# Register strategies at startup
@register_strategy("router")
class RouterStrategy(Strategy):
pass
@register_strategy("sequential")
class SequentialStrategy(Strategy):
pass
@register_strategy("map_reduce")
class MapReduceStrategy(Strategy):
pass
๐ Executing via Temporal
from temporalio.client import Client
# Connect to Temporal
client = await Client.connect("localhost:7233")
# Execute any registered strategy
result = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=[
"router", # Strategy name
"Process this customer inquiry", # Input
{
"service_name": "Customer Support",
"langfuse_mode": "auto",
"session_id": "session_123",
"user_id": "user_456"
}, # Processor config
{"model": "gpt-4"}, # Strategy overrides
{"source": "api", "priority": "high"} # Context
],
id=f"customer-inquiry-{uuid.uuid4()}",
task_queue="agency-task-queue",
)
Execution Modes Comparison
๐ซ Direct Execution
Traditional synchronous execution without Temporal:
# Direct execution - no fault tolerance
strategy = RouterStrategy()
processor = Processor()
strategy.set_processor(processor)
result = await strategy.execute("input", context)
Pros:
- Simple and straightforward
- Lower latency
- No infrastructure required
Cons:
- No automatic retries
- No fault tolerance
- Limited visibility
- Difficult to scale
๐ Temporal Execution
Durable execution through Temporal:
# Temporal execution - full fault tolerance
result = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=["router", "input", processor_config],
id=workflow_id,
task_queue="agency-task-queue",
)
Pros:
- Automatic retries and fault tolerance
- Complete execution history
- Horizontal scaling
- Long-running operation support
- Built-in timeouts
Cons:
- Additional infrastructure (Temporal server)
- Slightly higher latency
- Learning curve
Benefits for Complex Strategies
๐ก๏ธ Fault Tolerance
Temporal automatically handles failures:
# Configure retry policy for transient failures
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=3,
backoff_coefficient=2.0,
)
# Workflows automatically resume from last checkpoint
# Even if the worker crashes, another worker picks up
๐ Scalability
Scale horizontally by adding more workers:
# Start multiple workers on different machines
for i in range(num_workers):
worker = await worker_manager.start_worker(
temporal_address="temporal-server:7233",
max_concurrent_activities=10
)
await worker.run()
๐ Visibility
Full execution history in Temporal UI:
- View all workflow executions
- See detailed execution timeline
- Inspect input/output of each activity
- Debug failures with stack traces
- Monitor performance metrics
โฐ Long-Running Operations
Perfect for complex agent chains:
# Workflows can run for hours, days, or longer
result = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=["complex_analysis", large_dataset, config],
id="long-running-analysis",
task_queue="agency-task-queue",
execution_timeout=timedelta(hours=24), # Can run up to 24 hours
)
๐ Workflow Versioning
Safely deploy changes without disrupting running workflows:
# Use workflow versioning for zero-downtime deployments
version = workflow.get_version("strategy_update_v2", 1, 2)
if version == 1:
# Old logic
result = await old_strategy_execution()
else:
# New logic
result = await new_strategy_execution()
Session and Context Propagation
Temporal maintains context across distributed execution:
# Context flows through the entire workflow
context = {
"workflow_id": workflow.info().workflow_id,
"workflow_run_id": workflow.info().run_id,
"execution_context": "temporal_activity",
"execution_mode": "temporal",
"source": "api",
"user_id": "user_123"
}
# Sessions are recreated in activities
session = SessionFactory.create_user_session(
user_id=session_config.get("user_id"),
base_dir="temp_sessions"
)
Error Handling
๐จ Activity Failures
Activities automatically retry with exponential backoff:
try:
result = await workflow.execute_activity(
execute_strategy_directly,
args=[strategy_name, input_data, config],
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=3,
non_retryable_error_types=["ValueError", "TypeError"],
),
)
except ActivityError as e:
workflow.logger.error(f"Activity failed after retries: {e}")
# Implement compensation logic or alternative strategy
๐ Monitoring and Alerting
# Add custom metrics to workflows
workflow.metric_meter().create_counter("strategies_executed").add(1)
workflow.metric_meter().create_histogram("execution_duration").record(duration)
# Query workflow state
handle = client.get_workflow_handle(workflow_id)
description = await handle.describe()
if description.status == WorkflowStatus.FAILED:
# Trigger alert
notify_ops_team(workflow_id, description.failure_info)
Agency Configuration for Temporal
The Agency framework provides a seamless integration with Temporal workflows through its unified API. You can configure an Agency to support both direct execution and Temporal workflow execution.
๐๏ธ Agency Architecture with Temporal
graph TB
subgraph "Agency Layer"
AgencyClass[Agency Class]
Config[Agency Config]
Session[SQLiteSession]
end
subgraph "Execution Layer"
DirectMode[Direct Execution]
TemporalMode[Temporal Execution]
end
subgraph "Strategy Layer"
Strategy[Strategy Instance]
Registry[Strategy Registry]
end
subgraph "Processor Layer"
Processor[Processor with Observability]
LangfuseClient[Langfuse Client]
end
subgraph "Temporal Infrastructure"
WorkflowExec[Workflow Execution]
ActivityExec[Activity Execution]
Workers[Temporal Workers]
end
AgencyClass --> Config
AgencyClass --> Session
AgencyClass --> DirectMode
AgencyClass --> TemporalMode
DirectMode --> Strategy
TemporalMode --> Registry
Strategy --> Processor
Registry --> ActivityExec
ActivityExec --> Processor
Processor --> LangfuseClient
TemporalMode --> WorkflowExec
WorkflowExec --> Workers
Workers --> ActivityExec
classDef agency fill:#e3f2fd
classDef execution fill:#fff3e0
classDef strategy fill:#e8f5e8
classDef processor fill:#fce4ec
classDef temporal fill:#f3e5f5
class AgencyClass,Config,Session agency
class DirectMode,TemporalMode execution
class Strategy,Registry strategy
class Processor,LangfuseClient processor
class WorkflowExec,ActivityExec,Workers temporal
๐ง Basic Agency Configuration
Direct Execution (Backward Compatible)
from agency import Agency, RouterStrategy
from agency.sessions.session_factory import SessionFactory
# Create a session for conversation memory
session = SessionFactory.create_file_session(
session_id="user_123_session",
db_path="data/sessions/user_123.db"
)
# Traditional direct execution agency
agency = Agency(
name="Customer Support Agency",
strategy=RouterStrategy(), # Direct strategy instance
session=session,
user_id="user_123",
langfuse_mode=LangfuseMode.LOCAL
)
# This runs directly without Temporal
result = await agency.run("I need help with my billing")
Temporal-Enabled Agency
from agency import Agency
from agency.temporal.core import register_strategy
from agency.strategies import RouterStrategy
# Register strategy for Temporal execution
@register_strategy("router")
class RouterStrategy(Strategy):
pass
# Create Temporal-enabled agency
agency = Agency(
name="Customer Support Agency",
strategy_name="router", # Use registered strategy name
temporal_config={
"enabled": True,
"temporal_address": "localhost:7233",
"task_queue": "customer-support-queue",
"timeout_seconds": 300
},
session=session,
user_id="user_123",
langfuse_mode=LangfuseMode.LOCAL
)
# This runs via Temporal workflow
result = await agency.run("Complex multi-step inquiry", use_temporal=True)
Hybrid Agency (Both Modes)
# Agency supporting both execution modes
agency = Agency(
name="Customer Support Agency",
strategy=RouterStrategy(), # For direct execution
strategy_name="router", # For Temporal execution
temporal_config={"enabled": True},
session=session,
user_id="user_123"
)
# Choose execution mode at runtime
quick_result = await agency.run("Simple question") # Direct execution
complex_result = await agency.run("Complex workflow", use_temporal=True) # Temporal
โ๏ธ Configuration Options
Temporal Configuration
temporal_config = {
"enabled": True,
"temporal_address": "localhost:7233", # Temporal server address
"task_queue": "agency-task-queue", # Task queue for workflows
"timeout_seconds": 300, # Activity timeout
"retry_policy": { # Custom retry policy
"initial_interval": 1,
"maximum_interval": 10,
"maximum_attempts": 3,
"backoff_coefficient": 2.0
},
"auto_start_worker": True, # Auto-manage workers
"max_concurrent_activities": 10 # Worker concurrency
}
Session Configuration
# Memory session (temporary, for testing)
session = SessionFactory.create_memory_session("test_session")
# File session (persistent across restarts)
session = SessionFactory.create_file_session(
session_id="production_session_123",
db_path="data/sessions/production_session_123.db"
)
# Load existing session or create new
session = SessionFactory.load_or_create_session(
session_id="user_456_conversation",
db_path="data/sessions/user_456.db"
)
Langfuse Observability Configuration
# Auto-detect best option (local โ remote โ basic logging)
agency = Agency(..., langfuse_mode=LangfuseMode.AUTO)
# Force local development Langfuse
agency = Agency(..., langfuse_mode=LangfuseMode.LOCAL)
# Use production Langfuse
agency = Agency(..., langfuse_mode=LangfuseMode.REMOTE)
# Disable observability for maximum performance
agency = Agency(..., langfuse_mode=LangfuseMode.DISABLED)
Session Flow Architecture
Sessions (SQLiteSession) provide persistent conversation memory that flows seamlessly between Agency, strategies, processors, and Temporal activities.
๐ Session Data Flow
sequenceDiagram
participant App as Application
participant Agency as Agency
participant Strategy as Strategy
participant Processor as Processor
participant Activity as Temporal Activity
participant Session as SQLiteSession
App->>Agency: Create with session
Note over Agency,Session: Session attached to Agency
App->>Agency: run("User message")
Agency->>Strategy: execute(input, context)
Strategy->>Processor: run(agent, input, context)
alt Direct Execution
Processor->>Session: Load conversation history
Session-->>Processor: Previous messages
Processor->>Session: Save new messages
Session-->>Processor: Confirmation
Processor-->>Strategy: Agent result
else Temporal Execution
Processor->>Activity: execute_strategy_directly()
Activity->>Session: Recreate session from config
Note over Activity,Session: Session config contains:<br/>- session_type (memory/file)<br/>- user_id<br/>- db_path (if file)<br/>- langfuse_session_id
Session-->>Activity: Session instance
Activity->>Session: Load conversation history
Session-->>Activity: Previous messages
Activity->>Session: Save new messages
Session-->>Activity: Confirmation
Activity-->>Processor: Activity result
end
Strategy-->>Agency: Strategy result
Agency-->>App: Final result
Note over App,Session: Session persists across all executions<br/>maintaining conversation continuity
๐ Session Persistence in Temporal
Session Configuration Serialization
# When creating a Temporal workflow, session info is serialized
processor_config = {
"service_name": "Customer Support",
"langfuse_mode": "local",
"session_id": langfuse_session_id,
"user_id": user_id,
"session_config": {
"session_type": "file", # or "memory"
"session_id": session.session_id,
"user_id": user_id,
"db_path": "/data/sessions/user_123.db",
"langfuse_session_id": langfuse_session_id
}
}
Session Recreation in Activities
@activity.defn
async def execute_strategy_directly(
strategy_name: str,
input_data: str,
processor_config: dict,
**kwargs
) -> StrategyResult:
"""Activity recreates session from configuration."""
# Extract session configuration
session_config = processor_config.get("session_config")
session = None
if session_config:
session_type = session_config.get("session_type", "memory")
if session_type == "memory":
# For memory sessions, use user_id to maintain continuity
user_id = session_config.get("user_id")
session = SessionFactory.create_user_session(user_id, "temp_sessions")
elif session_type == "file":
# For file sessions, recreate using session_id and db_path
session_id = session_config.get("session_id")
db_path = session_config.get("db_path")
if session_id and db_path:
session = SessionFactory.load_or_create_session(session_id, db_path)
# Create processor with recreated session
processor = Processor(
service_name=processor_config.get("service_name"),
langfuse_mode=LangfuseMode(processor_config.get("langfuse_mode")),
session_id=processor_config.get("session_id"),
user_id=processor_config.get("user_id"),
session=session # Recreated session maintains conversation history
)
# Execute strategy with persistent session
strategy_class = StrategyRegistry.get(strategy_name)
strategy = strategy_class()
strategy.set_processor(processor)
result = await strategy.execute(input_data, context)
return result
๐พ Session Types and Use Cases
Memory Sessions
# Temporary sessions (testing, ephemeral conversations)
session = SessionFactory.create_memory_session("temp_session")
# Use case: Unit tests, temporary demos, stateless interactions
agency = Agency(
name="Test Agency",
strategy=TestStrategy(),
session=session
)
File Sessions
# Persistent sessions (production, long-term conversations)
session = SessionFactory.create_file_session(
session_id="customer_support_user_123",
db_path="data/sessions/customer_support_user_123.db"
)
# Use case: Customer support, ongoing conversations, production systems
agency = Agency(
name="Customer Support",
strategy=SupportStrategy(),
session=session,
user_id="user_123"
)
User-Based Sessions
# Automatic session management per user
from agency.sessions.session_factory import SessionFactory
def create_user_agency(user_id: str) -> Agency:
"""Create an agency with persistent session for a user."""
# Create or load existing session for user
session = SessionFactory.load_or_create_session(
session_id=f"user_{user_id}_main_session",
db_path=f"data/sessions/user_{user_id}.db"
)
return Agency(
name=f"Personal Assistant for {user_id}",
strategy_name="router",
temporal_config={"enabled": True},
session=session,
user_id=user_id,
langfuse_mode=LangfuseMode.AUTO
)
# Usage
agency = create_user_agency("user_123")
result1 = await agency.run("What's my order status?") # First interaction
result2 = await agency.run("Thanks for the update!") # Continues conversation
๐ Session Debugging and Monitoring
Session Information
# Get session info from agency
session_info = agency.get_session_info()
print(f"Session ID: {session_info['session_id']}")
print(f"Message count: {session_info['message_count']}")
print(f"Created: {session_info['created_at']}")
# Direct session access
if agency.session:
messages = agency.session.get_messages()
print(f"Conversation history: {len(messages)} messages")
Session Persistence Verification
# Test session persistence across Temporal activities
async def test_session_persistence():
agency = Agency(
name="Test",
strategy_name="router",
session=SessionFactory.create_file_session("test", "test.db"),
temporal_config={"enabled": True}
)
# First interaction
result1 = await agency.run("Remember that I like coffee", use_temporal=True)
# Second interaction (should remember context)
result2 = await agency.run("What do I like to drink?", use_temporal=True)
# Verify session contains both interactions
messages = agency.session.get_messages()
assert len(messages) >= 4 # 2 user messages + 2 assistant responses
Best Practices
โ Do
# Use meaningful workflow IDs
workflow_id = f"customer-{customer_id}-inquiry-{timestamp}"
# Set appropriate timeouts
start_to_close_timeout=timedelta(seconds=30) # Activity timeout
execution_timeout=timedelta(minutes=5) # Total workflow timeout
# Include context for debugging
context = {
"request_id": request_id,
"user_id": user_id,
"source": "api",
"version": "1.0"
}
# Use task queue routing for specialized workers
task_queue = "gpu-intensive-queue" if needs_gpu else "default-queue"
โ Avoid
# Don't use random workflow IDs
workflow_id = str(uuid.uuid4()) # Hard to track
# Don't set infinite timeouts
start_to_close_timeout=None # Can hang forever
# Don't ignore retry policies
retry_policy=None # No automatic recovery
# Don't hardcode configuration
temporal_address="localhost:7233" # Use environment variables
Local Development Setup
๐ณ Docker Compose
Start the full Temporal stack locally:
# Start Temporal, PostgreSQL, and Web UI
docker-compose up temporal temporal-postgres temporal-web
# Access Temporal UI
open http://localhost:8080
# Temporal server available at
localhost:7233
๐ Start Workers
# Start an Agency worker
poetry run python run_agency_worker.py
# Worker output
๐ง Temporal v2 worker started on task queue: agency-task-queue
๐ Available strategies: ['router', 'sequential', 'map_reduce']
๐ฏ Workflow: GenericStrategyWorkflow
๐ Activity: execute_strategy_directly
โ๏ธ Max concurrent activities: 10
๐งช Test Workflow Execution
# Example client script
from temporalio.client import Client
from agency.temporal.core import GenericStrategyWorkflow
async def test_temporal_execution():
# Connect to local Temporal
client = await Client.connect("localhost:7233")
# Execute a strategy
result = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=[
"router",
"Test input",
{"service_name": "Test", "langfuse_mode": "local"},
],
id=f"test-{uuid.uuid4()}",
task_queue="agency-task-queue",
)
print(f"Result: {result}")
# Run the test
asyncio.run(test_temporal_execution())
Production Deployment
๐ Temporal Cloud
For production, consider Temporal Cloud for managed infrastructure:
# Connect to Temporal Cloud
client = await Client.connect(
"your-namespace.tmprl.cloud:7233",
tls=TLSConfig(
client_cert=client_cert,
client_private_key=client_key,
),
)
๐ Monitoring
Integrate with your observability stack:
# Prometheus metrics
worker = Worker(
client,
task_queue="agency-task-queue",
workflows=[GenericStrategyWorkflow],
activities=[execute_strategy_directly],
metrics_endpoint="0.0.0.0:9090", # Prometheus endpoint
)
# Custom metrics
workflow.metric_meter().create_counter("strategies_executed")
workflow.metric_meter().create_histogram("strategy_duration")
Troubleshooting
๐ Common Issues
Worker Not Picking Up Tasks
# Check worker is connected to correct task queue
grep "task queue" worker.log
# Verify Temporal server is running
curl http://localhost:7233/health
# Check for workflow errors in UI
open http://localhost:8080
Workflow Timeouts
# Increase timeouts for long-running strategies
execution_timeout=timedelta(hours=1)
start_to_close_timeout=timedelta(minutes=30)
# Add heartbeats for long activities
activity.heartbeat("Processing batch 10 of 100")
High Memory Usage
# Limit concurrent activities
max_concurrent_activities=5 # Reduce from default
# Use workflow continuations for large datasets
if len(items) > 1000:
# Process in batches with continue_as_new
await workflow.continue_as_new(remaining_items)
Related Topics
- Temporal Workers: Detailed worker configuration and management
- Observability: Tracing and monitoring with OpenTelemetry
- Processors: Processor integration with Temporal activities
- Strategies: Strategy patterns and Temporal execution
When to Use Temporal
Use Temporal when you need:
- Fault-tolerant execution of critical workflows
- Complex multi-step agent orchestrations
- Long-running operations (hours or days)
- Automatic retries with exponential backoff
- Complete audit trail of executions
- Horizontal scaling across multiple workers
Use direct execution when you need:
- Simple, fast agent calls
- Minimal infrastructure
- Sub-second response times
- Development and testing
Temporal Resources