Skip to content

Temporal Workers

Temporal Workers are the execution engines that run your AI agent workflows and activities. They poll task queues for work, execute the code, and report results back to the Temporal server.

What are Workers?

Workers are long-running processes that:

  • Poll Task Queues: Continuously check for available work
  • Execute Workflows: Run the orchestration logic
  • Execute Activities: Perform the actual work (running agents)
  • Handle Failures: Implement retry logic and error handling
  • Scale Horizontally: Multiple workers can share the load

Think of workers as the "compute layer" that actually runs your code, while Temporal server handles the coordination.

Worker Fundamentals

๐Ÿ”„ Worker Lifecycle

sequenceDiagram
    participant Worker
    participant TaskQueue as Task Queue
    participant TemporalServer as Temporal Server
    participant Activity as Agent Activity

    Worker->>TaskQueue: Poll for work
    TaskQueue-->>Worker: Task available
    Worker->>TemporalServer: Claim task
    Worker->>Activity: Execute agent strategy
    Activity-->>Worker: Result
    Worker->>TemporalServer: Complete task
    Worker->>TaskQueue: Poll for next work

โš™๏ธ Worker Components

Each worker consists of:

# Worker registration
worker = Worker(
    client,                              # Temporal client connection
    task_queue="agency-task-queue",      # Named queue to poll
    workflows=[GenericStrategyWorkflow], # Workflows this worker can run
    activities=[execute_strategy_directly], # Activities this worker can execute
    max_concurrent_activities=10,        # Concurrency limit
    workflow_runner=UnsandboxedWorkflowRunner(), # Execution environment
)

TemporalWorkerManager

The TemporalWorkerManager provides a high-level interface for managing Agency workers:

๐Ÿš€ Basic Worker Setup

from agency.temporal.core import TemporalWorkerManager

# Create a worker manager
worker_manager = TemporalWorkerManager(task_queue="agency-task-queue")

# Start the worker
worker = await worker_manager.start_worker(
    temporal_address="localhost:7233",
    max_concurrent_activities=10
)

# Run the worker (this blocks)
await worker_manager.run_worker()

๐Ÿ“Š Worker Output

When you start a worker, you'll see:

๐Ÿ”ง Temporal v2 worker started on task queue: agency-task-queue
๐Ÿ“‹ Available strategies: ['router', 'sequential', 'map_reduce', 'specialist']
๐ŸŽฏ Workflow: GenericStrategyWorkflow
๐Ÿƒ Activity: execute_strategy_directly
โš™๏ธ  Max concurrent activities: 10
๐Ÿš€ Starting Temporal v2 worker for task queue: agency-task-queue

๐Ÿ› ๏ธ Advanced Configuration

# Custom configuration
worker_manager = TemporalWorkerManager(
    task_queue="gpu-intensive-queue"  # Specialized queue
)

worker = await worker_manager.start_worker(
    temporal_address="production.temporal.io:7233",
    max_concurrent_activities=5,  # Limit for resource-intensive tasks
)

# Get client for workflow execution
client = await worker_manager.get_client()

Worker Pool Management

๐Ÿ—๏ธ Persistent Worker Pool

The framework includes a persistent worker pool for production deployments:

from agency.temporal.persistent_worker_pool import PersistentWorkerPool

# Create a pool of workers
pool = PersistentWorkerPool(
    num_workers=5,
    task_queue="agency-task-queue",
    temporal_address="localhost:7233",
    max_concurrent_activities=10
)

# Start all workers
await pool.start()

# Workers run indefinitely, automatically restart on failure
# Stop gracefully
await pool.stop()

๐Ÿ“Š Pool Monitoring

# Monitor pool health
status = await pool.get_status()
print(f"Active workers: {status['active_workers']}")
print(f"Failed workers: {status['failed_workers']}")
print(f"Total tasks processed: {status['total_tasks']}")

# Restart failed workers
await pool.restart_failed_workers()

Development Workflow

๐Ÿงช Local Development

1. Start Temporal Server

# Using Docker Compose
docker-compose up temporal temporal-postgres temporal-web

# Temporal server: localhost:7233
# Temporal UI: http://localhost:8080

2. Register Your Strategies

# In your application startup
from agency.temporal.core import register_strategy
from agency.strategies import RouterStrategy, SequentialStrategy

@register_strategy("router")
class RouterStrategy(Strategy):
    pass

@register_strategy("sequential")
class SequentialStrategy(Strategy):
    pass

3. Start Worker

# Using the provided worker script
poetry run python run_agency_worker.py

# Or create your own worker script:
# custom_worker.py
import asyncio
from agency.temporal.core import TemporalWorkerManager
from agency.strategies import register_all_strategies

async def main():
    # Register all your strategies
    register_all_strategies()

    # Start worker
    worker_manager = TemporalWorkerManager("my-task-queue")
    worker = await worker_manager.start_worker()

    print("Worker started, press Ctrl+C to stop")
    try:
        await worker_manager.run_worker()
    except KeyboardInterrupt:
        print("Stopping worker...")
        await worker_manager.stop_worker()

if __name__ == "__main__":
    asyncio.run(main())

4. Test with Client

# test_client.py
import asyncio
from temporalio.client import Client
from agency.temporal.core import GenericStrategyWorkflow

async def test_execution():
    client = await Client.connect("localhost:7233")

    result = await client.execute_workflow(
        GenericStrategyWorkflow.run,
        args=[
            "router",
            "Test input",
            {"service_name": "Test Service", "langfuse_mode": "local"}
        ],
        id=f"test-{uuid.uuid4()}",
        task_queue="my-task-queue",
    )

    print(f"Result: {result}")

asyncio.run(test_execution())

Task Queue Configuration

๐Ÿ“‹ Task Queue Strategy

Task queues enable flexible worker deployment:

Single Queue (Simple)

# All workers use the same queue
task_queue = "agency-task-queue"

# Good for:
# - Simple deployments
# - Development
# - Small teams

Specialized Queues (Advanced)

# Different queues for different workloads
queues = {
    "fast-queries": "quick-response-queue",     # < 5 second responses
    "complex-analysis": "long-running-queue",   # > 30 second operations
    "gpu-inference": "gpu-worker-queue",        # GPU-intensive work
}

# Deploy workers with different capabilities to different queues

๐Ÿš€ Queue-based Scaling

# Scale different types of work independently
# 10 fast workers
for i in range(10):
    worker_manager = TemporalWorkerManager("quick-response-queue")
    await worker_manager.start_worker(max_concurrent_activities=20)

# 3 GPU workers
for i in range(3):
    worker_manager = TemporalWorkerManager("gpu-worker-queue")
    await worker_manager.start_worker(max_concurrent_activities=2)

Worker Configuration

โš™๏ธ Concurrency Settings

# High concurrency for I/O-bound work (API calls, LLM requests)
worker = await worker_manager.start_worker(
    max_concurrent_activities=50  # Many concurrent activities
)

# Low concurrency for CPU/GPU-bound work
worker = await worker_manager.start_worker(
    max_concurrent_activities=2   # Limited concurrent activities
)

๐Ÿ”ง Resource Limits

# Memory-constrained environments
worker = Worker(
    client,
    task_queue="memory-limited-queue",
    workflows=[GenericStrategyWorkflow],
    activities=[execute_strategy_directly],
    max_concurrent_activities=3,
    max_cached_workflows=100,  # Limit workflow cache
    workflow_runner=UnsandboxedWorkflowRunner(),
)

โฑ๏ธ Timeout Configuration

# Configure activity timeouts
worker = Worker(
    client,
    task_queue="agency-task-queue",
    workflows=[GenericStrategyWorkflow],
    activities=[execute_strategy_directly],
    # Activity-level timeout defaults
    max_heartbeat_throttle_interval=timedelta(seconds=60),
    default_heartbeat_throttle_interval=timedelta(seconds=30),
)

Production Deployment

๐ŸŒ Deployment Patterns

Container-based Deployment

# Dockerfile for Agency worker
FROM python:3.10-slim

# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application code
COPY . /app
WORKDIR /app

# Register strategies and start worker
CMD ["python", "production_worker.py"]

Kubernetes Deployment

# agency-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agency-worker
spec:
  replicas: 5  # Scale based on load
  selector:
    matchLabels:
      app: agency-worker
  template:
    metadata:
      labels:
        app: agency-worker
    spec:
      containers:
      - name: worker
        image: agency-worker:latest
        env:
        - name: TEMPORAL_ADDRESS
          value: "temporal.production.svc.cluster.local:7233"
        - name: TASK_QUEUE
          value: "agency-task-queue"
        - name: MAX_CONCURRENT_ACTIVITIES
          value: "10"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"

Auto-scaling Configuration

# agency-worker-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agency-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agency-worker
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: temporal_task_queue_depth
      target:
        type: AverageValue
        averageValue: "10"

๐Ÿ“Š Production Monitoring

# Enhanced production worker with metrics
import prometheus_client
from temporalio.worker import Worker

class ProductionWorkerManager(TemporalWorkerManager):
    def __init__(self, task_queue: str):
        super().__init__(task_queue)
        self.metrics = {
            'tasks_processed': prometheus_client.Counter('tasks_processed_total'),
            'task_duration': prometheus_client.Histogram('task_duration_seconds'),
            'active_workers': prometheus_client.Gauge('active_workers'),
        }

    async def start_worker(self, **kwargs):
        worker = await super().start_worker(**kwargs)

        # Start metrics server
        prometheus_client.start_http_server(8000)

        return worker

Health Checks and Monitoring

๐Ÿฅ Worker Health Checks

# Health check endpoint for workers
from fastapi import FastAPI
from agency.temporal.worker_manager import WorkerManager

app = FastAPI()
worker_manager = None

@app.get("/health")
async def health_check():
    if worker_manager and worker_manager.is_healthy():
        return {"status": "healthy", "worker_count": worker_manager.active_workers}
    return {"status": "unhealthy"}, 503

@app.get("/metrics")
async def get_metrics():
    if worker_manager:
        return {
            "tasks_processed": worker_manager.tasks_processed,
            "average_task_duration": worker_manager.average_task_duration,
            "error_rate": worker_manager.error_rate,
        }
    return {"error": "Worker not available"}, 503

๐Ÿ“Š Temporal UI Monitoring

Access the Temporal UI to monitor:

  • Worker Status: See which workers are active
  • Task Queue Depth: Monitor backlog
  • Workflow Execution: Track success/failure rates
  • Performance Metrics: Average execution times
# Local Temporal UI
open http://localhost:8080

# Look for:
# - Workers tab: Active worker instances
# - Task Queues: Queue depth and processing rates
# - Workflows: Execution history and performance

๐Ÿšจ Alerting

Set up alerts for key metrics:

# Example alerting thresholds
ALERTS = {
    "worker_down": "No workers active for > 1 minute",
    "queue_depth": "Task queue depth > 100 items",
    "error_rate": "Error rate > 5% over 5 minutes",
    "latency": "P95 latency > 30 seconds",
}

Troubleshooting

๐Ÿ”ง Common Issues

Worker Not Starting

# Check Temporal server connectivity
curl -v http://localhost:7233/health

# Check worker logs
poetry run python run_agency_worker.py --verbose

# Verify task queue name
grep "task_queue" worker_config.py

No Tasks Being Processed

# Check if workflows are being scheduled to correct task queue
# In Temporal UI: Workflows > Task Queue column

# Verify worker is polling correct queue
grep "Temporal v2 worker started on task queue" worker.log

# Check for worker errors
grep ERROR worker.log

High Memory Usage

# Reduce concurrent activities
max_concurrent_activities=5  # Down from 10

# Limit workflow cache
max_cached_workflows=50  # Down from default

# Use workflow continue_as_new for long-running workflows
if iteration_count > 1000:
    await workflow.continue_as_new(remaining_data)

Activity Timeouts

# Increase activity timeout
start_to_close_timeout=timedelta(minutes=10)  # Up from 2 minutes

# Add heartbeats for long-running activities
@activity.defn
async def long_running_activity():
    for i in range(100):
        # Do work
        activity.heartbeat(f"Processing item {i}")
        await process_item(i)

๐Ÿ› Debugging Techniques

Enable Debug Logging

import logging
logging.basicConfig(level=logging.DEBUG)

# Temporal client debugging
from temporalio.client import Client
client = await Client.connect(
    "localhost:7233",
    debug_mode=True  # Enable debug logging
)

Use Local Testing

# Test activities directly without Temporal
from agency.temporal.core import execute_strategy_directly

result = await execute_strategy_directly(
    strategy_name="router",
    input_data="test input",
    processor_config={"service_name": "test"},
)
print(result)

Check Worker Resources

# Monitor worker resource usage
docker stats agency-worker

# Check system resources
htop

Best Practices

โœ… Do

# Use descriptive task queue names
task_queue = "customer-support-agents"  # Clear purpose

# Set appropriate concurrency limits
max_concurrent_activities = min(cpu_cores * 2, available_memory_gb * 5)

# Include health checks
@app.get("/health")
async def health():
    return {"status": "healthy", "timestamp": datetime.utcnow()}

# Implement graceful shutdown
async def shutdown_handler():
    await worker_manager.stop_worker()
    await cleanup_resources()

# Monitor key metrics
worker_start_time = time.time()
tasks_processed = 0

โŒ Avoid

# Don't use generic task queue names
task_queue = "queue1"  # Not descriptive

# Don't set unlimited concurrency
max_concurrent_activities = float('inf')  # Will exhaust resources

# Don't ignore worker failures
try:
    await worker.run()
except Exception:
    pass  # Silent failure is bad

# Don't hardcode configuration
temporal_address = "localhost:7233"  # Use environment variables

Worker Deployment Strategy

Development: Single worker on local machine

Staging: Small worker pool (2-3 workers) with monitoring

Production: Auto-scaling worker pool with full observability

Performance Optimization

I/O Bound Work: High concurrency (20-50 activities)

CPU Bound Work: Low concurrency (1-5 activities)

Mixed Workloads: Use specialized task queues

Long Operations: Add heartbeats and use continue_as_new

Resource Management

  • Workers consume memory proportional to concurrent activities
  • Each activity uses a thread or async task
  • Monitor memory usage and set appropriate limits
  • Use graceful shutdown to avoid data loss