Local Activities
For performance optimization, activities can run directly in the workflow worker process instead of being scheduled through the task queue. This is useful for fast operations (< 1 second) where scheduling overhead is significant.
When to Use
Good for:
- Quick computations, validations, data transformations
- High-volume scenarios where latency matters
- Operations completing in under 1 second
Avoid for:
- Long-running operations (> a few seconds)
- External API calls or I/O-heavy tasks
- Operations needing separate retry isolation or scaling
Basic Usage
from mistralai.workflows import run_activities_locally
@workflows.activity()
async def validate_email(email: str) -> bool:
return "@" in email
@workflows.workflow.define(name="user-workflow")
class UserWorkflow:
@workflows.workflow.entrypoint
async def execute(self, email: str) -> bool:
with run_activities_locally():
result = await validate_email(email)
return resultMixed Execution
Combine local and remote activities:
@workflows.workflow.define(name="mixed-workflow")
class MixedWorkflow:
@workflows.workflow.entrypoint
async def execute(self, email: str) -> dict:
# Fast lookup runs locally
with run_activities_locally():
domain = await quick_lookup(email)
# Slow operation runs as regular activity
verified = await external_api_call(email)
return {"domain": domain, "verified": verified}Performance Impact
| Execution Mode | Typical Latency | Best For |
|---|---|---|
| Local Activity | < 10ms | Quick computations, lookups |
| Regular Activity | 50-200ms | API calls, complex logic |
Limitations
Bypasses standard scheduling:
- No task queue isolation - activities share resources with workflows
- Worker process blocking - slow local activities block the workflow worker
- No independent scaling - cannot scale local activities separately
- Limited failure isolation - worker crashes affect both workflows and local activities
⚠️ Warning: Local activities can cause worker crashes if they fail unexpectedly. Unlike regular activities, local activity failures can bring down the entire worker process, potentially causing data loss and requiring manual intervention to restart the workflow. Activity settings (partial support): Timeouts, retry policies, and dependency injection work the same for local activities. However, rate limiting is NOT supported for local activities.
Note: The sticky_to_worker, rate_limit, and heartbeat_timeout parameters don't apply to local activities since they already run in the workflow worker process.
@workflows.activity(
start_to_close_timeout=timedelta(seconds=5),
retry_policy_max_attempts=3
)
async def timed_local_activity(data: str) -> str:
# Timeouts and retry policies apply even when running locally
# Rate limiting is NOT supported for local activities
return data.upper()