Activities
Activities perform the actual work in your application while workflows coordinate them.
What is an activity?
An activity is a unit of work that performs actual computations, API calls, or other operations. Key characteristics:
- Must be idempotent (safe to retry)
- Execute in isolated processes
- Have automatic retry mechanisms
- Accept any JSON-serializable types as inputs/outputs (str, int, dict, list, etc.)
- Input/output limited to 2MB
Defining an Activity
Basic activity structure:
import mistralai.workflows as workflows
@workflows.activity()
async def my_activity(input_data: str, count: int = 1) -> dict:
"""Activity implementation"""
# Perform work here
return {"result": input_data, "processed_count": count}Core Activity Features
1. Activity Naming
Customize activity identification with:
@workflows.activity(
name="custom_activity_name", # Used for registration, execution, and observability
)Learn more about observability and naming
2. Timeouts
Configure execution time limits to prevent runaway activities:
from datetime import datetime
@workflows.activity(
start_to_close_timeout=datetime.timedelta(minutes=10)
)3. Retry Policies
Automatic retries with configurable policies to handle transient failures:
@workflows.activity(
retry_policy_max_attempts=5,
retry_policy_backoff_coefficient=2.0
)4. Worker Stickiness
Execute activities on the same worker for performance optimization:
@workflows.activity(
sticky_to_worker=True
)See Sticky Worker Sessions for detailed documentation
5. Heartbeat Timeout
Detect stuck activities quickly by requiring periodic heartbeat signals:
from datetime import timedelta
from mistralai.workflows import activity
@workflows.activity(
start_to_close_timeout=timedelta(minutes=30),
heartbeat_timeout=timedelta(seconds=30)
)
async def long_running_task(items: list[str]) -> dict:
results = []
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# Report progress to prevent timeout
activity.heartbeat({"processed": i + 1, "total": len(items)})
return {"results": results}When heartbeat_timeout is set, activities must call activity.heartbeat() periodically. If no heartbeat is received within the timeout, the activity is considered failed and a retry is triggered. This enables fast detection of stuck activities without waiting for the full start_to_close_timeout.
Note: Not supported for local activities.
Activity vs Workflow
| Feature | Activity | Workflow |
|---|---|---|
| Duration | Seconds to minutes | Seconds to years |
| State | Stateless | Stateful |
| Retries | Automatic | Manual recovery |
| Parallelism | Single operation | Complex coordination |
Granularity and Failure Handling
Activities should be designed to be as granular as possible. This means breaking down complex tasks into smaller, manageable activities. However, each activity should encapsulate all the logic that is susceptible to failure. This granular approach has several benefits:
- Isolation of Failures: Smaller activities make it easier to isolate and handle failures. If a failure occurs, only the affected activity needs to be retried.
- Easier Debugging: Granular activities make it easier to identify the exact point of failure and debug issues.
- Better Retry Mechanisms: Since each activity is idempotent and can be retried independently, granular activities ensure that only the failed part of the process is retried, saving time and resources.
For example, if you have a process that involves fetching data from an API, processing that data, and then storing it in a database, you might define three separate activities:
- Fetch data from API
- Process the data
- Store the data in the database
Nested Activities
When designing activities, it's important to understand how state management works, especially when activities are nested within each other. If an activity is encapsulated within another activity, only the parent activity will be considered for retries and state management. This means that the state will be saved before the nested activity, and only the parent activity will be retried in case of failure.
import mistralai.workflows as workflows
@workflows.activity()
async def parent_activity(input_data: str) -> dict:
"""Parent activity that encapsulates a nested activity."""
# Some logic here
nested_result = await nested_activity(input_data)
return {"result_data": nested_result["result_data"]}
@workflows.activity()
async def nested_activity(input_data: str) -> dict:
"""Nested activity."""
# Logic for the nested activity
return {"result_data": "processed_data"}In this example, if nested_activity fails, the entire parent_activity will be retried, not just the nested activity. The state will be saved before the nested activity is called.
Local Activities
For performance optimization, activities can run directly in the workflow worker process instead of being scheduled through the task queue. This is useful for fast operations (< 1 second) where scheduling overhead is significant.
When to Use
Good for:
- Quick computations, validations, data transformations
- High-volume scenarios where latency matters
- Operations completing in under 1 second
Avoid for:
- Long-running operations (> a few seconds)
- External API calls or I/O-heavy tasks
- Operations needing separate retry isolation or scaling
Basic Usage
from mistralai.workflows import run_activities_locally
@workflows.activity()
async def validate_email(email: str) -> bool:
return "@" in email
@workflows.workflow.define(name="user-workflow")
class UserWorkflow:
@workflows.workflow.entrypoint
async def execute(self, email: str) -> bool:
with run_activities_locally():
result = await validate_email(email)
return resultMixed Execution
Combine local and remote activities:
@workflows.workflow.define(name="mixed-workflow")
class MixedWorkflow:
@workflows.workflow.entrypoint
async def execute(self, email: str) -> dict:
# Fast lookup runs locally
with run_activities_locally():
domain = await quick_lookup(email)
# Slow operation runs as regular activity
verified = await external_api_call(email)
return {"domain": domain, "verified": verified}Performance Impact
| Execution Mode | Typical Latency | Best For |
|---|---|---|
| Local Activity | < 10ms | Quick computations, lookups |
| Regular Activity | 50-200ms | API calls, complex logic |
Limitations
Limitations
Bypasses standard scheduling:
- No task queue isolation - activities share resources with workflows
- Worker process blocking - slow local activities block the workflow worker
- No independent scaling - cannot scale local activities separately
- Limited failure isolation - worker crashes affect both workflows and local activities
⚠️ Warning: Local activities can cause worker crashes if they fail unexpectedly. Unlike regular activities, local activity failures can bring down the entire worker process, potentially causing data loss and requiring manual intervention to restart the workflow. Activity settings (partial support): Timeouts, retry policies, and dependency injection work the same for local activities. However, rate limiting is NOT supported for local activities.
Note: The sticky_to_worker, rate_limit, and heartbeat_timeout parameters don't apply to local activities since they already run in the workflow worker process.
@workflows.activity(
start_to_close_timeout=timedelta(seconds=5),
retry_policy_max_attempts=3
)
async def timed_local_activity(data: str) -> str:
# Timeouts and retry policies apply even when running locally
# Rate limiting is NOT supported for local activities
return data.upper()Sticky Worker Sessions
Sticky worker sessions route multiple activities to the same worker instance, enabling resource reuse and stateful operations across activity calls.
When to Use
Good for:
- Sharing expensive resources (ML models, database connections, cached data)
- Stateful data processing across multiple activity calls
- Reducing initialization overhead for related operations
Avoid for:
- Activities that must be highly available (session breaks if worker crashes)
- Long-running workflows (ties up specific worker resources)
- When worker-level state isn't needed
Basic Usage
from mistralai.workflows import run_sticky_worker_session
# Worker-level state
_loaded_model = None
@workflows.activity(sticky_to_worker=True)
async def load_model() -> None:
global _loaded_model
if _loaded_model is None:
_loaded_model = load_expensive_model()
@workflows.activity(sticky_to_worker=True)
async def predict(data: dict) -> dict:
return _loaded_model.predict(data)
@workflows.workflow.define(name="ml-inference")
class MLInferenceWorkflow:
@workflows.workflow.entrypoint
async def execute(self, batch: list[dict]) -> list[dict]:
# All activities run on same worker
async with run_sticky_worker_session():
await load_model() # Load once
results = [await predict(item) for item in batch]
return resultsSession Reuse
Capture a session explicitly to reuse across multiple scopes:
@workflows.workflow.define(name="multi-batch")
class MultiBatchWorkflow:
@workflows.workflow.entrypoint
async def execute(self, batches: list[list[dict]]) -> list[list[dict]]:
# Capture worker once
session = await get_sticky_worker_session()
all_results = []
for batch in batches:
async with run_sticky_worker_session(session):
results = [await process_item(item) for item in batch]
all_results.append(results)
return all_resultsLimitations
Session breaks on worker failure:
- If the worker crashes or scales down, the session ends
- Subsequent activities route to a different worker
- Design activities to handle cold starts gracefully
In-memory state only:
- Worker-level state is lost on worker restart or redeployment
- Use databases or external storage for persistent state
- Not a replacement for distributed state management
Worker resource contention:
- Long-running sessions tie up specific worker capacity
- Can create hot spots if many workflows target the same worker
- Monitor worker utilization to avoid resource starvation
Note: The sticky_to_worker parameter doesn't apply to local activities since they already run in the workflow worker process.
Performance Comparison
| Feature | Regular Activity | Sticky Session | Local Activity |
|---|---|---|---|
| Routing Overhead | Standard | Standard | None |
| Worker Isolation | Yes | Yes | No |
| Resource Sharing | No | Yes | N/A |
| Best For | Independent ops | Resource reuse | Fast lookups |