Local Activities

For performance optimization, activities can run directly in the workflow worker process instead of being scheduled through the task queue. This is useful for fast operations (< 1 second) where scheduling overhead is significant.

When to Use

When to Use

Good for:

  • Quick computations, validations, data transformations
  • High-volume scenarios where latency matters
  • Operations completing in under 1 second

Avoid for:

  • Long-running operations (> a few seconds)
  • External API calls or I/O-heavy tasks
  • Operations needing separate retry isolation or scaling
Basic Usage

Basic Usage

from mistralai.workflows import run_activities_locally

@workflows.activity()
async def validate_email(email: str) -> bool:
    return "@" in email

@workflows.workflow.define(name="user-workflow")
class UserWorkflow:
    @workflows.workflow.entrypoint
    async def execute(self, email: str) -> bool:
        with run_activities_locally():
            result = await validate_email(email)
        return result
Mixed Execution

Mixed Execution

Combine local and remote activities:

@workflows.workflow.define(name="mixed-workflow")
class MixedWorkflow:
    @workflows.workflow.entrypoint
    async def execute(self, email: str) -> dict:
        # Fast lookup runs locally
        with run_activities_locally():
            domain = await quick_lookup(email)

        # Slow operation runs as regular activity
        verified = await external_api_call(email)

        return {"domain": domain, "verified": verified}
Performance Impact

Performance Impact

Execution ModeTypical LatencyBest For
Local Activity< 10msQuick computations, lookups
Regular Activity50-200msAPI calls, complex logic
Limitations

Limitations

Bypasses standard scheduling:

  • No task queue isolation - activities share resources with workflows
  • Worker process blocking - slow local activities block the workflow worker
  • No independent scaling - cannot scale local activities separately
  • Limited failure isolation - worker crashes affect both workflows and local activities

⚠️ Warning: Local activities can cause worker crashes if they fail unexpectedly. Unlike regular activities, local activity failures can bring down the entire worker process, potentially causing data loss and requiring manual intervention to restart the workflow. Activity settings (partial support): Timeouts, retry policies, and dependency injection work the same for local activities. However, rate limiting is NOT supported for local activities.

Note: The sticky_to_worker, rate_limit, and heartbeat_timeout parameters don't apply to local activities since they already run in the workflow worker process.

@workflows.activity(
    start_to_close_timeout=timedelta(seconds=5),
    retry_policy_max_attempts=3
)
async def timed_local_activity(data: str) -> str:
    # Timeouts and retry policies apply even when running locally
    # Rate limiting is NOT supported for local activities
    return data.upper()