Temporal Workers
Temporal Workers are the execution engines that run your AI agent workflows and activities. They poll task queues for work, execute the code, and report results back to the Temporal server.
What are Workers?
Workers are long-running processes that:
- Poll Task Queues: Continuously check for available work
- Execute Workflows: Run the orchestration logic
- Execute Activities: Perform the actual work (running agents)
- Handle Failures: Implement retry logic and error handling
- Scale Horizontally: Multiple workers can share the load
Think of workers as the "compute layer" that actually runs your code, while Temporal server handles the coordination.
Worker Fundamentals
๐ Worker Lifecycle
sequenceDiagram
participant Worker
participant TaskQueue as Task Queue
participant TemporalServer as Temporal Server
participant Activity as Agent Activity
Worker->>TaskQueue: Poll for work
TaskQueue-->>Worker: Task available
Worker->>TemporalServer: Claim task
Worker->>Activity: Execute agent strategy
Activity-->>Worker: Result
Worker->>TemporalServer: Complete task
Worker->>TaskQueue: Poll for next work
โ๏ธ Worker Components
Each worker consists of:
# Worker registration
worker = Worker(
client, # Temporal client connection
task_queue="agency-task-queue", # Named queue to poll
workflows=[GenericStrategyWorkflow], # Workflows this worker can run
activities=[execute_strategy_directly], # Activities this worker can execute
max_concurrent_activities=10, # Concurrency limit
workflow_runner=UnsandboxedWorkflowRunner(), # Execution environment
)
TemporalWorkerManager
The TemporalWorkerManager provides a high-level interface for managing Agency workers:
๐ Basic Worker Setup
from agency.temporal.core import TemporalWorkerManager
# Create a worker manager
worker_manager = TemporalWorkerManager(task_queue="agency-task-queue")
# Start the worker
worker = await worker_manager.start_worker(
temporal_address="localhost:7233",
max_concurrent_activities=10
)
# Run the worker (this blocks)
await worker_manager.run_worker()
๐ Worker Output
When you start a worker, you'll see:
๐ง Temporal v2 worker started on task queue: agency-task-queue
๐ Available strategies: ['router', 'sequential', 'map_reduce', 'specialist']
๐ฏ Workflow: GenericStrategyWorkflow
๐ Activity: execute_strategy_directly
โ๏ธ Max concurrent activities: 10
๐ Starting Temporal v2 worker for task queue: agency-task-queue
๐ ๏ธ Advanced Configuration
# Custom configuration
worker_manager = TemporalWorkerManager(
task_queue="gpu-intensive-queue" # Specialized queue
)
worker = await worker_manager.start_worker(
temporal_address="production.temporal.io:7233",
max_concurrent_activities=5, # Limit for resource-intensive tasks
)
# Get client for workflow execution
client = await worker_manager.get_client()
Worker Pool Management
๐๏ธ Persistent Worker Pool
The framework includes a persistent worker pool for production deployments:
from agency.temporal.persistent_worker_pool import PersistentWorkerPool
# Create a pool of workers
pool = PersistentWorkerPool(
num_workers=5,
task_queue="agency-task-queue",
temporal_address="localhost:7233",
max_concurrent_activities=10
)
# Start all workers
await pool.start()
# Workers run indefinitely, automatically restart on failure
# Stop gracefully
await pool.stop()
๐ Pool Monitoring
# Monitor pool health
status = await pool.get_status()
print(f"Active workers: {status['active_workers']}")
print(f"Failed workers: {status['failed_workers']}")
print(f"Total tasks processed: {status['total_tasks']}")
# Restart failed workers
await pool.restart_failed_workers()
Development Workflow
๐งช Local Development
1. Start Temporal Server
# Using Docker Compose
docker-compose up temporal temporal-postgres temporal-web
# Temporal server: localhost:7233
# Temporal UI: http://localhost:8080
2. Register Your Strategies
# In your application startup
from agency.temporal.core import register_strategy
from agency.strategies import RouterStrategy, SequentialStrategy
@register_strategy("router")
class RouterStrategy(Strategy):
pass
@register_strategy("sequential")
class SequentialStrategy(Strategy):
pass
3. Start Worker
# Using the provided worker script
poetry run python run_agency_worker.py
# Or create your own worker script:
# custom_worker.py
import asyncio
from agency.temporal.core import TemporalWorkerManager
from agency.strategies import register_all_strategies
async def main():
# Register all your strategies
register_all_strategies()
# Start worker
worker_manager = TemporalWorkerManager("my-task-queue")
worker = await worker_manager.start_worker()
print("Worker started, press Ctrl+C to stop")
try:
await worker_manager.run_worker()
except KeyboardInterrupt:
print("Stopping worker...")
await worker_manager.stop_worker()
if __name__ == "__main__":
asyncio.run(main())
4. Test with Client
# test_client.py
import asyncio
from temporalio.client import Client
from agency.temporal.core import GenericStrategyWorkflow
async def test_execution():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
GenericStrategyWorkflow.run,
args=[
"router",
"Test input",
{"service_name": "Test Service", "langfuse_mode": "local"}
],
id=f"test-{uuid.uuid4()}",
task_queue="my-task-queue",
)
print(f"Result: {result}")
asyncio.run(test_execution())
Task Queue Configuration
๐ Task Queue Strategy
Task queues enable flexible worker deployment:
Single Queue (Simple)
# All workers use the same queue
task_queue = "agency-task-queue"
# Good for:
# - Simple deployments
# - Development
# - Small teams
Specialized Queues (Advanced)
# Different queues for different workloads
queues = {
"fast-queries": "quick-response-queue", # < 5 second responses
"complex-analysis": "long-running-queue", # > 30 second operations
"gpu-inference": "gpu-worker-queue", # GPU-intensive work
}
# Deploy workers with different capabilities to different queues
๐ Queue-based Scaling
# Scale different types of work independently
# 10 fast workers
for i in range(10):
worker_manager = TemporalWorkerManager("quick-response-queue")
await worker_manager.start_worker(max_concurrent_activities=20)
# 3 GPU workers
for i in range(3):
worker_manager = TemporalWorkerManager("gpu-worker-queue")
await worker_manager.start_worker(max_concurrent_activities=2)
Worker Configuration
โ๏ธ Concurrency Settings
# High concurrency for I/O-bound work (API calls, LLM requests)
worker = await worker_manager.start_worker(
max_concurrent_activities=50 # Many concurrent activities
)
# Low concurrency for CPU/GPU-bound work
worker = await worker_manager.start_worker(
max_concurrent_activities=2 # Limited concurrent activities
)
๐ง Resource Limits
# Memory-constrained environments
worker = Worker(
client,
task_queue="memory-limited-queue",
workflows=[GenericStrategyWorkflow],
activities=[execute_strategy_directly],
max_concurrent_activities=3,
max_cached_workflows=100, # Limit workflow cache
workflow_runner=UnsandboxedWorkflowRunner(),
)
โฑ๏ธ Timeout Configuration
# Configure activity timeouts
worker = Worker(
client,
task_queue="agency-task-queue",
workflows=[GenericStrategyWorkflow],
activities=[execute_strategy_directly],
# Activity-level timeout defaults
max_heartbeat_throttle_interval=timedelta(seconds=60),
default_heartbeat_throttle_interval=timedelta(seconds=30),
)
Production Deployment
๐ Deployment Patterns
Container-based Deployment
# Dockerfile for Agency worker
FROM python:3.10-slim
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY . /app
WORKDIR /app
# Register strategies and start worker
CMD ["python", "production_worker.py"]
Kubernetes Deployment
# agency-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agency-worker
spec:
replicas: 5 # Scale based on load
selector:
matchLabels:
app: agency-worker
template:
metadata:
labels:
app: agency-worker
spec:
containers:
- name: worker
image: agency-worker:latest
env:
- name: TEMPORAL_ADDRESS
value: "temporal.production.svc.cluster.local:7233"
- name: TASK_QUEUE
value: "agency-task-queue"
- name: MAX_CONCURRENT_ACTIVITIES
value: "10"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
Auto-scaling Configuration
# agency-worker-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agency-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agency-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: temporal_task_queue_depth
target:
type: AverageValue
averageValue: "10"
๐ Production Monitoring
# Enhanced production worker with metrics
import prometheus_client
from temporalio.worker import Worker
class ProductionWorkerManager(TemporalWorkerManager):
def __init__(self, task_queue: str):
super().__init__(task_queue)
self.metrics = {
'tasks_processed': prometheus_client.Counter('tasks_processed_total'),
'task_duration': prometheus_client.Histogram('task_duration_seconds'),
'active_workers': prometheus_client.Gauge('active_workers'),
}
async def start_worker(self, **kwargs):
worker = await super().start_worker(**kwargs)
# Start metrics server
prometheus_client.start_http_server(8000)
return worker
Health Checks and Monitoring
๐ฅ Worker Health Checks
# Health check endpoint for workers
from fastapi import FastAPI
from agency.temporal.worker_manager import WorkerManager
app = FastAPI()
worker_manager = None
@app.get("/health")
async def health_check():
if worker_manager and worker_manager.is_healthy():
return {"status": "healthy", "worker_count": worker_manager.active_workers}
return {"status": "unhealthy"}, 503
@app.get("/metrics")
async def get_metrics():
if worker_manager:
return {
"tasks_processed": worker_manager.tasks_processed,
"average_task_duration": worker_manager.average_task_duration,
"error_rate": worker_manager.error_rate,
}
return {"error": "Worker not available"}, 503
๐ Temporal UI Monitoring
Access the Temporal UI to monitor:
- Worker Status: See which workers are active
- Task Queue Depth: Monitor backlog
- Workflow Execution: Track success/failure rates
- Performance Metrics: Average execution times
# Local Temporal UI
open http://localhost:8080
# Look for:
# - Workers tab: Active worker instances
# - Task Queues: Queue depth and processing rates
# - Workflows: Execution history and performance
๐จ Alerting
Set up alerts for key metrics:
# Example alerting thresholds
ALERTS = {
"worker_down": "No workers active for > 1 minute",
"queue_depth": "Task queue depth > 100 items",
"error_rate": "Error rate > 5% over 5 minutes",
"latency": "P95 latency > 30 seconds",
}
Troubleshooting
๐ง Common Issues
Worker Not Starting
# Check Temporal server connectivity
curl -v http://localhost:7233/health
# Check worker logs
poetry run python run_agency_worker.py --verbose
# Verify task queue name
grep "task_queue" worker_config.py
No Tasks Being Processed
# Check if workflows are being scheduled to correct task queue
# In Temporal UI: Workflows > Task Queue column
# Verify worker is polling correct queue
grep "Temporal v2 worker started on task queue" worker.log
# Check for worker errors
grep ERROR worker.log
High Memory Usage
# Reduce concurrent activities
max_concurrent_activities=5 # Down from 10
# Limit workflow cache
max_cached_workflows=50 # Down from default
# Use workflow continue_as_new for long-running workflows
if iteration_count > 1000:
await workflow.continue_as_new(remaining_data)
Activity Timeouts
# Increase activity timeout
start_to_close_timeout=timedelta(minutes=10) # Up from 2 minutes
# Add heartbeats for long-running activities
@activity.defn
async def long_running_activity():
for i in range(100):
# Do work
activity.heartbeat(f"Processing item {i}")
await process_item(i)
๐ Debugging Techniques
Enable Debug Logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Temporal client debugging
from temporalio.client import Client
client = await Client.connect(
"localhost:7233",
debug_mode=True # Enable debug logging
)
Use Local Testing
# Test activities directly without Temporal
from agency.temporal.core import execute_strategy_directly
result = await execute_strategy_directly(
strategy_name="router",
input_data="test input",
processor_config={"service_name": "test"},
)
print(result)
Check Worker Resources
Best Practices
โ Do
# Use descriptive task queue names
task_queue = "customer-support-agents" # Clear purpose
# Set appropriate concurrency limits
max_concurrent_activities = min(cpu_cores * 2, available_memory_gb * 5)
# Include health checks
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.utcnow()}
# Implement graceful shutdown
async def shutdown_handler():
await worker_manager.stop_worker()
await cleanup_resources()
# Monitor key metrics
worker_start_time = time.time()
tasks_processed = 0
โ Avoid
# Don't use generic task queue names
task_queue = "queue1" # Not descriptive
# Don't set unlimited concurrency
max_concurrent_activities = float('inf') # Will exhaust resources
# Don't ignore worker failures
try:
await worker.run()
except Exception:
pass # Silent failure is bad
# Don't hardcode configuration
temporal_address = "localhost:7233" # Use environment variables
Related Topics
- Temporal Workflows: Understanding workflow orchestration
- Observability: Monitoring and tracing workers
- Local Development: Setting up development environment
- Processors: How processors work in Temporal activities
Worker Deployment Strategy
Development: Single worker on local machine
Staging: Small worker pool (2-3 workers) with monitoring
Production: Auto-scaling worker pool with full observability
Performance Optimization
I/O Bound Work: High concurrency (20-50 activities)
CPU Bound Work: Low concurrency (1-5 activities)
Mixed Workloads: Use specialized task queues
Long Operations: Add heartbeats and use continue_as_new
Resource Management
- Workers consume memory proportional to concurrent activities
- Each activity uses a thread or async task
- Monitor memory usage and set appropriate limits
- Use graceful shutdown to avoid data loss