Skip to content

Temporal Workflows

Temporal is a workflow orchestration platform that brings reliability, scalability, and visibility to the digitalNXT Agency framework. It enables durable execution of complex agent strategies with built-in fault tolerance and retry mechanisms.

What is Temporal?

Temporal is an open-source workflow orchestration platform that ensures your code runs reliably, even in the face of failures. It provides:

  • Durable Execution: Workflows automatically resume from the last checkpoint after crashes
  • Built-in Retries: Configurable retry policies for transient failures
  • Visibility: Complete history of workflow execution
  • Scalability: Horizontal scaling through worker pools
  • Language Agnostic: SDKs for multiple programming languages

Why Temporal for Agent Orchestration?

Traditional agent execution faces several challenges that Temporal elegantly solves:

โŒ Without Temporal

  • Agent failures require manual intervention
  • Complex retry logic must be hand-coded
  • No visibility into long-running operations
  • Difficult to scale agent execution
  • State management is complex

โœ… With Temporal

  • Automatic recovery from failures
  • Declarative retry policies
  • Full execution history and debugging
  • Horizontal scaling via workers
  • State automatically persisted

Core Concepts

๐Ÿ”„ Workflows

Workflows are the orchestration logic that coordinates activities. In Agency, we use a single GenericStrategyWorkflow that can execute any registered strategy.

@workflow.defn
class GenericStrategyWorkflow:
    """Universal workflow for executing any Agency strategy."""

    @workflow.run
    async def run(
        self,
        strategy_name: str,
        input_data: str,
        processor_config: dict,
        strategy_overrides: dict = None,
        context: dict = None,
        timeout_seconds: int = 120,
    ) -> StrategyResult:
        """Execute strategy with full observability and fault tolerance."""
        workflow.logger.info(f"๐Ÿ”„ Starting workflow for strategy: {strategy_name}")

        result = await workflow.execute_activity(
            execute_strategy_directly,
            args=[strategy_name, input_data, processor_config, strategy_overrides, context],
            start_to_close_timeout=timedelta(seconds=timeout_seconds),
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=10),
                maximum_attempts=3,
            ),
        )

        return result

โš™๏ธ Activities

Activities are the actual units of work - in our case, executing agent strategies. Activities can be retried, have timeouts, and are where the real processing happens.

@activity.defn
async def execute_strategy_directly(
    strategy_name: str,
    input_data: str,
    processor_config: dict,
    strategy_overrides: dict = None,
    context: dict = None
) -> StrategyResult:
    """Execute any registered strategy with full observability."""

    # Get strategy from registry
    strategy_class = StrategyRegistry.get(strategy_name)
    strategy = strategy_class(**strategy_overrides)

    # Create processor with configuration
    processor = Processor(
        service_name=processor_config.get("service_name"),
        langfuse_mode=LangfuseMode(processor_config.get("langfuse_mode")),
        session_id=processor_config.get("session_id"),
        user_id=processor_config.get("user_id"),
    )

    # Execute strategy
    strategy.set_processor(processor)
    result = await strategy.execute(input_data, context)

    return result

๐Ÿ‘ท Workers

Workers are processes that execute workflows and activities. They poll task queues for work and execute it.

# Start a worker that can handle any strategy
worker_manager = TemporalWorkerManager(task_queue="agency-task-queue")
worker = await worker_manager.start_worker(
    temporal_address="localhost:7233",
    max_concurrent_activities=10
)
await worker.run()

๐Ÿ“‹ Task Queues

Task queues are named queues that route work to specific workers. This enables:

  • Load balancing across multiple workers
  • Specialized workers for different strategy types
  • Versioning and gradual rollouts

Architecture Overview

graph TB
    subgraph "Client Application"
        App[Application Code]
    end

    subgraph "Temporal Server"
        TServer[Temporal Server]
        TaskQueue[Task Queue: agency-task-queue]
        History[Workflow History]
    end

    subgraph "Worker Pool"
        W1[Worker 1]
        W2[Worker 2]
        W3[Worker N...]
    end

    subgraph "Agency Framework"
        Registry[Strategy Registry]
        Strategies[Registered Strategies]
        Processor[Processor with Observability]
    end

    App -->|Start Workflow| TServer
    TServer --> TaskQueue
    TaskQueue -->|Poll for Work| W1
    TaskQueue -->|Poll for Work| W2
    TaskQueue -->|Poll for Work| W3

    W1 --> Registry
    W2 --> Registry
    W3 --> Registry

    Registry --> Strategies
    Strategies --> Processor

    W1 -->|Update Status| History
    W2 -->|Update Status| History
    W3 -->|Update Status| History

    classDef temporal fill:#e3f2fd
    classDef worker fill:#e8f5e8
    classDef agency fill:#fff3e0

    class TServer,TaskQueue,History temporal
    class W1,W2,W3 worker
    class Registry,Strategies,Processor agency

Strategy Registry Pattern

The Strategy Registry eliminates code duplication by providing a single workflow and activity that can execute any strategy:

๐Ÿ“ Registering Strategies

from agency.temporal.core import register_strategy
from agency.strategies import RouterStrategy, SequentialStrategy, MapReduceStrategy

# Register strategies at startup
@register_strategy("router")
class RouterStrategy(Strategy):
    pass

@register_strategy("sequential")
class SequentialStrategy(Strategy):
    pass

@register_strategy("map_reduce")
class MapReduceStrategy(Strategy):
    pass

๐Ÿš€ Executing via Temporal

from temporalio.client import Client

# Connect to Temporal
client = await Client.connect("localhost:7233")

# Execute any registered strategy
result = await client.execute_workflow(
    GenericStrategyWorkflow.run,
    args=[
        "router",  # Strategy name
        "Process this customer inquiry",  # Input
        {
            "service_name": "Customer Support",
            "langfuse_mode": "auto",
            "session_id": "session_123",
            "user_id": "user_456"
        },  # Processor config
        {"model": "gpt-4"},  # Strategy overrides
        {"source": "api", "priority": "high"}  # Context
    ],
    id=f"customer-inquiry-{uuid.uuid4()}",
    task_queue="agency-task-queue",
)

Execution Modes Comparison

๐Ÿ’ซ Direct Execution

Traditional synchronous execution without Temporal:

# Direct execution - no fault tolerance
strategy = RouterStrategy()
processor = Processor()
strategy.set_processor(processor)
result = await strategy.execute("input", context)

Pros:

  • Simple and straightforward
  • Lower latency
  • No infrastructure required

Cons:

  • No automatic retries
  • No fault tolerance
  • Limited visibility
  • Difficult to scale

๐Ÿ”„ Temporal Execution

Durable execution through Temporal:

# Temporal execution - full fault tolerance
result = await client.execute_workflow(
    GenericStrategyWorkflow.run,
    args=["router", "input", processor_config],
    id=workflow_id,
    task_queue="agency-task-queue",
)

Pros:

  • Automatic retries and fault tolerance
  • Complete execution history
  • Horizontal scaling
  • Long-running operation support
  • Built-in timeouts

Cons:

  • Additional infrastructure (Temporal server)
  • Slightly higher latency
  • Learning curve

Benefits for Complex Strategies

๐Ÿ›ก๏ธ Fault Tolerance

Temporal automatically handles failures:

# Configure retry policy for transient failures
retry_policy=RetryPolicy(
    initial_interval=timedelta(seconds=1),
    maximum_interval=timedelta(seconds=10),
    maximum_attempts=3,
    backoff_coefficient=2.0,
)

# Workflows automatically resume from last checkpoint
# Even if the worker crashes, another worker picks up

๐Ÿ“ˆ Scalability

Scale horizontally by adding more workers:

# Start multiple workers on different machines
for i in range(num_workers):
    worker = await worker_manager.start_worker(
        temporal_address="temporal-server:7233",
        max_concurrent_activities=10
    )
    await worker.run()

๐Ÿ” Visibility

Full execution history in Temporal UI:

  • View all workflow executions
  • See detailed execution timeline
  • Inspect input/output of each activity
  • Debug failures with stack traces
  • Monitor performance metrics

โฐ Long-Running Operations

Perfect for complex agent chains:

# Workflows can run for hours, days, or longer
result = await client.execute_workflow(
    GenericStrategyWorkflow.run,
    args=["complex_analysis", large_dataset, config],
    id="long-running-analysis",
    task_queue="agency-task-queue",
    execution_timeout=timedelta(hours=24),  # Can run up to 24 hours
)

๐Ÿ”„ Workflow Versioning

Safely deploy changes without disrupting running workflows:

# Use workflow versioning for zero-downtime deployments
version = workflow.get_version("strategy_update_v2", 1, 2)
if version == 1:
    # Old logic
    result = await old_strategy_execution()
else:
    # New logic
    result = await new_strategy_execution()

Session and Context Propagation

Temporal maintains context across distributed execution:

# Context flows through the entire workflow
context = {
    "workflow_id": workflow.info().workflow_id,
    "workflow_run_id": workflow.info().run_id,
    "execution_context": "temporal_activity",
    "execution_mode": "temporal",
    "source": "api",
    "user_id": "user_123"
}

# Sessions are recreated in activities
session = SessionFactory.create_user_session(
    user_id=session_config.get("user_id"),
    base_dir="temp_sessions"
)

Error Handling

๐Ÿšจ Activity Failures

Activities automatically retry with exponential backoff:

try:
    result = await workflow.execute_activity(
        execute_strategy_directly,
        args=[strategy_name, input_data, config],
        retry_policy=RetryPolicy(
            initial_interval=timedelta(seconds=1),
            maximum_interval=timedelta(seconds=10),
            maximum_attempts=3,
            non_retryable_error_types=["ValueError", "TypeError"],
        ),
    )
except ActivityError as e:
    workflow.logger.error(f"Activity failed after retries: {e}")
    # Implement compensation logic or alternative strategy

๐Ÿ“Š Monitoring and Alerting

# Add custom metrics to workflows
workflow.metric_meter().create_counter("strategies_executed").add(1)
workflow.metric_meter().create_histogram("execution_duration").record(duration)

# Query workflow state
handle = client.get_workflow_handle(workflow_id)
description = await handle.describe()
if description.status == WorkflowStatus.FAILED:
    # Trigger alert
    notify_ops_team(workflow_id, description.failure_info)

Agency Configuration for Temporal

The Agency framework provides a seamless integration with Temporal workflows through its unified API. You can configure an Agency to support both direct execution and Temporal workflow execution.

๐Ÿ—๏ธ Agency Architecture with Temporal

graph TB
    subgraph "Agency Layer"
        AgencyClass[Agency Class]
        Config[Agency Config]
        Session[SQLiteSession]
    end

    subgraph "Execution Layer"
        DirectMode[Direct Execution]
        TemporalMode[Temporal Execution]
    end

    subgraph "Strategy Layer"
        Strategy[Strategy Instance]
        Registry[Strategy Registry]
    end

    subgraph "Processor Layer"
        Processor[Processor with Observability]
        LangfuseClient[Langfuse Client]
    end

    subgraph "Temporal Infrastructure"
        WorkflowExec[Workflow Execution]
        ActivityExec[Activity Execution]
        Workers[Temporal Workers]
    end

    AgencyClass --> Config
    AgencyClass --> Session
    AgencyClass --> DirectMode
    AgencyClass --> TemporalMode

    DirectMode --> Strategy
    TemporalMode --> Registry

    Strategy --> Processor
    Registry --> ActivityExec

    ActivityExec --> Processor
    Processor --> LangfuseClient

    TemporalMode --> WorkflowExec
    WorkflowExec --> Workers
    Workers --> ActivityExec

    classDef agency fill:#e3f2fd
    classDef execution fill:#fff3e0
    classDef strategy fill:#e8f5e8
    classDef processor fill:#fce4ec
    classDef temporal fill:#f3e5f5

    class AgencyClass,Config,Session agency
    class DirectMode,TemporalMode execution
    class Strategy,Registry strategy
    class Processor,LangfuseClient processor
    class WorkflowExec,ActivityExec,Workers temporal

๐Ÿ”ง Basic Agency Configuration

Direct Execution (Backward Compatible)

from agency import Agency, RouterStrategy
from agency.sessions.session_factory import SessionFactory

# Create a session for conversation memory
session = SessionFactory.create_file_session(
    session_id="user_123_session",
    db_path="data/sessions/user_123.db"
)

# Traditional direct execution agency
agency = Agency(
    name="Customer Support Agency",
    strategy=RouterStrategy(),  # Direct strategy instance
    session=session,
    user_id="user_123",
    langfuse_mode=LangfuseMode.LOCAL
)

# This runs directly without Temporal
result = await agency.run("I need help with my billing")

Temporal-Enabled Agency

from agency import Agency
from agency.temporal.core import register_strategy
from agency.strategies import RouterStrategy

# Register strategy for Temporal execution
@register_strategy("router")
class RouterStrategy(Strategy):
    pass

# Create Temporal-enabled agency
agency = Agency(
    name="Customer Support Agency",
    strategy_name="router",  # Use registered strategy name
    temporal_config={
        "enabled": True,
        "temporal_address": "localhost:7233",
        "task_queue": "customer-support-queue",
        "timeout_seconds": 300
    },
    session=session,
    user_id="user_123",
    langfuse_mode=LangfuseMode.LOCAL
)

# This runs via Temporal workflow
result = await agency.run("Complex multi-step inquiry", use_temporal=True)

Hybrid Agency (Both Modes)

# Agency supporting both execution modes
agency = Agency(
    name="Customer Support Agency",
    strategy=RouterStrategy(),      # For direct execution
    strategy_name="router",         # For Temporal execution
    temporal_config={"enabled": True},
    session=session,
    user_id="user_123"
)

# Choose execution mode at runtime
quick_result = await agency.run("Simple question")  # Direct execution
complex_result = await agency.run("Complex workflow", use_temporal=True)  # Temporal

โš™๏ธ Configuration Options

Temporal Configuration

temporal_config = {
    "enabled": True,
    "temporal_address": "localhost:7233",        # Temporal server address
    "task_queue": "agency-task-queue",          # Task queue for workflows
    "timeout_seconds": 300,                     # Activity timeout
    "retry_policy": {                           # Custom retry policy
        "initial_interval": 1,
        "maximum_interval": 10,
        "maximum_attempts": 3,
        "backoff_coefficient": 2.0
    },
    "auto_start_worker": True,                  # Auto-manage workers
    "max_concurrent_activities": 10             # Worker concurrency
}

Session Configuration

# Memory session (temporary, for testing)
session = SessionFactory.create_memory_session("test_session")

# File session (persistent across restarts)
session = SessionFactory.create_file_session(
    session_id="production_session_123",
    db_path="data/sessions/production_session_123.db"
)

# Load existing session or create new
session = SessionFactory.load_or_create_session(
    session_id="user_456_conversation",
    db_path="data/sessions/user_456.db"
)

Langfuse Observability Configuration

# Auto-detect best option (local โ†’ remote โ†’ basic logging)
agency = Agency(..., langfuse_mode=LangfuseMode.AUTO)

# Force local development Langfuse
agency = Agency(..., langfuse_mode=LangfuseMode.LOCAL)

# Use production Langfuse
agency = Agency(..., langfuse_mode=LangfuseMode.REMOTE)

# Disable observability for maximum performance
agency = Agency(..., langfuse_mode=LangfuseMode.DISABLED)

Session Flow Architecture

Sessions (SQLiteSession) provide persistent conversation memory that flows seamlessly between Agency, strategies, processors, and Temporal activities.

๐Ÿ“Š Session Data Flow

sequenceDiagram
    participant App as Application
    participant Agency as Agency
    participant Strategy as Strategy
    participant Processor as Processor
    participant Activity as Temporal Activity
    participant Session as SQLiteSession

    App->>Agency: Create with session
    Note over Agency,Session: Session attached to Agency

    App->>Agency: run("User message")
    Agency->>Strategy: execute(input, context)
    Strategy->>Processor: run(agent, input, context)

    alt Direct Execution
        Processor->>Session: Load conversation history
        Session-->>Processor: Previous messages
        Processor->>Session: Save new messages
        Session-->>Processor: Confirmation
        Processor-->>Strategy: Agent result
    else Temporal Execution
        Processor->>Activity: execute_strategy_directly()
        Activity->>Session: Recreate session from config
        Note over Activity,Session: Session config contains:<br/>- session_type (memory/file)<br/>- user_id<br/>- db_path (if file)<br/>- langfuse_session_id

        Session-->>Activity: Session instance
        Activity->>Session: Load conversation history
        Session-->>Activity: Previous messages
        Activity->>Session: Save new messages
        Session-->>Activity: Confirmation
        Activity-->>Processor: Activity result
    end

    Strategy-->>Agency: Strategy result
    Agency-->>App: Final result

    Note over App,Session: Session persists across all executions<br/>maintaining conversation continuity

๐Ÿ”„ Session Persistence in Temporal

Session Configuration Serialization

# When creating a Temporal workflow, session info is serialized
processor_config = {
    "service_name": "Customer Support",
    "langfuse_mode": "local",
    "session_id": langfuse_session_id,
    "user_id": user_id,
    "session_config": {
        "session_type": "file",           # or "memory"
        "session_id": session.session_id,
        "user_id": user_id,
        "db_path": "/data/sessions/user_123.db",
        "langfuse_session_id": langfuse_session_id
    }
}

Session Recreation in Activities

@activity.defn
async def execute_strategy_directly(
    strategy_name: str,
    input_data: str,
    processor_config: dict,
    **kwargs
) -> StrategyResult:
    """Activity recreates session from configuration."""

    # Extract session configuration
    session_config = processor_config.get("session_config")
    session = None

    if session_config:
        session_type = session_config.get("session_type", "memory")

        if session_type == "memory":
            # For memory sessions, use user_id to maintain continuity
            user_id = session_config.get("user_id")
            session = SessionFactory.create_user_session(user_id, "temp_sessions")

        elif session_type == "file":
            # For file sessions, recreate using session_id and db_path
            session_id = session_config.get("session_id")
            db_path = session_config.get("db_path")
            if session_id and db_path:
                session = SessionFactory.load_or_create_session(session_id, db_path)

    # Create processor with recreated session
    processor = Processor(
        service_name=processor_config.get("service_name"),
        langfuse_mode=LangfuseMode(processor_config.get("langfuse_mode")),
        session_id=processor_config.get("session_id"),
        user_id=processor_config.get("user_id"),
        session=session  # Recreated session maintains conversation history
    )

    # Execute strategy with persistent session
    strategy_class = StrategyRegistry.get(strategy_name)
    strategy = strategy_class()
    strategy.set_processor(processor)

    result = await strategy.execute(input_data, context)
    return result

๐Ÿ’พ Session Types and Use Cases

Memory Sessions

# Temporary sessions (testing, ephemeral conversations)
session = SessionFactory.create_memory_session("temp_session")

# Use case: Unit tests, temporary demos, stateless interactions
agency = Agency(
    name="Test Agency",
    strategy=TestStrategy(),
    session=session
)

File Sessions

# Persistent sessions (production, long-term conversations)
session = SessionFactory.create_file_session(
    session_id="customer_support_user_123",
    db_path="data/sessions/customer_support_user_123.db"
)

# Use case: Customer support, ongoing conversations, production systems
agency = Agency(
    name="Customer Support",
    strategy=SupportStrategy(),
    session=session,
    user_id="user_123"
)

User-Based Sessions

# Automatic session management per user
from agency.sessions.session_factory import SessionFactory

def create_user_agency(user_id: str) -> Agency:
    """Create an agency with persistent session for a user."""

    # Create or load existing session for user
    session = SessionFactory.load_or_create_session(
        session_id=f"user_{user_id}_main_session",
        db_path=f"data/sessions/user_{user_id}.db"
    )

    return Agency(
        name=f"Personal Assistant for {user_id}",
        strategy_name="router",
        temporal_config={"enabled": True},
        session=session,
        user_id=user_id,
        langfuse_mode=LangfuseMode.AUTO
    )

# Usage
agency = create_user_agency("user_123")
result1 = await agency.run("What's my order status?")    # First interaction
result2 = await agency.run("Thanks for the update!")    # Continues conversation

๐Ÿ” Session Debugging and Monitoring

Session Information

# Get session info from agency
session_info = agency.get_session_info()
print(f"Session ID: {session_info['session_id']}")
print(f"Message count: {session_info['message_count']}")
print(f"Created: {session_info['created_at']}")

# Direct session access
if agency.session:
    messages = agency.session.get_messages()
    print(f"Conversation history: {len(messages)} messages")

Session Persistence Verification

# Test session persistence across Temporal activities
async def test_session_persistence():
    agency = Agency(
        name="Test",
        strategy_name="router",
        session=SessionFactory.create_file_session("test", "test.db"),
        temporal_config={"enabled": True}
    )

    # First interaction
    result1 = await agency.run("Remember that I like coffee", use_temporal=True)

    # Second interaction (should remember context)
    result2 = await agency.run("What do I like to drink?", use_temporal=True)

    # Verify session contains both interactions
    messages = agency.session.get_messages()
    assert len(messages) >= 4  # 2 user messages + 2 assistant responses

Best Practices

โœ… Do

# Use meaningful workflow IDs
workflow_id = f"customer-{customer_id}-inquiry-{timestamp}"

# Set appropriate timeouts
start_to_close_timeout=timedelta(seconds=30)  # Activity timeout
execution_timeout=timedelta(minutes=5)  # Total workflow timeout

# Include context for debugging
context = {
    "request_id": request_id,
    "user_id": user_id,
    "source": "api",
    "version": "1.0"
}

# Use task queue routing for specialized workers
task_queue = "gpu-intensive-queue" if needs_gpu else "default-queue"

โŒ Avoid

# Don't use random workflow IDs
workflow_id = str(uuid.uuid4())  # Hard to track

# Don't set infinite timeouts
start_to_close_timeout=None  # Can hang forever

# Don't ignore retry policies
retry_policy=None  # No automatic recovery

# Don't hardcode configuration
temporal_address="localhost:7233"  # Use environment variables

Local Development Setup

๐Ÿณ Docker Compose

Start the full Temporal stack locally:

# Start Temporal, PostgreSQL, and Web UI
docker-compose up temporal temporal-postgres temporal-web

# Access Temporal UI
open http://localhost:8080

# Temporal server available at
localhost:7233

๐Ÿš€ Start Workers

# Start an Agency worker
poetry run python run_agency_worker.py

# Worker output
๐Ÿ”ง Temporal v2 worker started on task queue: agency-task-queue
๐Ÿ“‹ Available strategies: ['router', 'sequential', 'map_reduce']
๐ŸŽฏ Workflow: GenericStrategyWorkflow
๐Ÿƒ Activity: execute_strategy_directly
โš™๏ธ  Max concurrent activities: 10

๐Ÿงช Test Workflow Execution

# Example client script
from temporalio.client import Client
from agency.temporal.core import GenericStrategyWorkflow

async def test_temporal_execution():
    # Connect to local Temporal
    client = await Client.connect("localhost:7233")

    # Execute a strategy
    result = await client.execute_workflow(
        GenericStrategyWorkflow.run,
        args=[
            "router",
            "Test input",
            {"service_name": "Test", "langfuse_mode": "local"},
        ],
        id=f"test-{uuid.uuid4()}",
        task_queue="agency-task-queue",
    )

    print(f"Result: {result}")

# Run the test
asyncio.run(test_temporal_execution())

Production Deployment

๐ŸŒ Temporal Cloud

For production, consider Temporal Cloud for managed infrastructure:

# Connect to Temporal Cloud
client = await Client.connect(
    "your-namespace.tmprl.cloud:7233",
    tls=TLSConfig(
        client_cert=client_cert,
        client_private_key=client_key,
    ),
)

๐Ÿ“Š Monitoring

Integrate with your observability stack:

# Prometheus metrics
worker = Worker(
    client,
    task_queue="agency-task-queue",
    workflows=[GenericStrategyWorkflow],
    activities=[execute_strategy_directly],
    metrics_endpoint="0.0.0.0:9090",  # Prometheus endpoint
)

# Custom metrics
workflow.metric_meter().create_counter("strategies_executed")
workflow.metric_meter().create_histogram("strategy_duration")

Troubleshooting

๐Ÿ” Common Issues

Worker Not Picking Up Tasks

# Check worker is connected to correct task queue
grep "task queue" worker.log

# Verify Temporal server is running
curl http://localhost:7233/health

# Check for workflow errors in UI
open http://localhost:8080

Workflow Timeouts

# Increase timeouts for long-running strategies
execution_timeout=timedelta(hours=1)
start_to_close_timeout=timedelta(minutes=30)

# Add heartbeats for long activities
activity.heartbeat("Processing batch 10 of 100")

High Memory Usage

# Limit concurrent activities
max_concurrent_activities=5  # Reduce from default

# Use workflow continuations for large datasets
if len(items) > 1000:
    # Process in batches with continue_as_new
    await workflow.continue_as_new(remaining_items)

When to Use Temporal

Use Temporal when you need:

  • Fault-tolerant execution of critical workflows
  • Complex multi-step agent orchestrations
  • Long-running operations (hours or days)
  • Automatic retries with exponential backoff
  • Complete audit trail of executions
  • Horizontal scaling across multiple workers

Use direct execution when you need:

  • Simple, fast agent calls
  • Minimal infrastructure
  • Sub-second response times
  • Development and testing