Activities

Activities perform the actual work in your application while workflows coordinate them.

What is an activity?

What is an activity?

An activity is a unit of work that performs actual computations, API calls, or other operations. Key characteristics:

  • Must be idempotent (safe to retry)
  • Execute in isolated processes
  • Have automatic retry mechanisms
  • Accept any JSON-serializable types as inputs/outputs (str, int, dict, list, etc.)
  • Input/output limited to 2MB
Defining an Activity

Defining an Activity

Basic activity structure:

import mistralai.workflows as workflows

@workflows.activity()
async def my_activity(input_data: str, count: int = 1) -> dict:
    """Activity implementation"""
    # Perform work here
    return {"result": input_data, "processed_count": count}
Core Activity Features

Core Activity Features

1. Activity Naming

Customize activity identification with:

@workflows.activity(
    name="custom_activity_name",  # Used for registration, execution, and observability
)

Learn more about observability and naming

2. Timeouts

Configure execution time limits to prevent runaway activities:

from datetime import datetime

@workflows.activity(
    start_to_close_timeout=datetime.timedelta(minutes=10)
)

Learn more about timeouts

3. Retry Policies

Automatic retries with configurable policies to handle transient failures:

@workflows.activity(
    retry_policy_max_attempts=5,
    retry_policy_backoff_coefficient=2.0
)

4. Worker Stickiness

Execute activities on the same worker for performance optimization:

@workflows.activity(
    sticky_to_worker=True
)

See Sticky Worker Sessions for detailed documentation

5. Heartbeat Timeout

Detect stuck activities quickly by requiring periodic heartbeat signals:

from datetime import timedelta
from mistralai.workflows import activity

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=30),
    heartbeat_timeout=timedelta(seconds=30)
)
async def long_running_task(items: list[str]) -> dict:
    results = []
    for i, item in enumerate(items):
        result = await process_item(item)
        results.append(result)
        # Report progress to prevent timeout
        activity.heartbeat({"processed": i + 1, "total": len(items)})
    return {"results": results}

When heartbeat_timeout is set, activities must call activity.heartbeat() periodically. If no heartbeat is received within the timeout, the activity is considered failed and a retry is triggered. This enables fast detection of stuck activities without waiting for the full start_to_close_timeout.

Note: Not supported for local activities.

Activity vs Workflow

Activity vs Workflow

FeatureActivityWorkflow
DurationSeconds to minutesSeconds to years
StateStatelessStateful
RetriesAutomaticManual recovery
ParallelismSingle operationComplex coordination
Granularity and Failure Handling

Granularity and Failure Handling

Activities should be designed to be as granular as possible. This means breaking down complex tasks into smaller, manageable activities. However, each activity should encapsulate all the logic that is susceptible to failure. This granular approach has several benefits:

  • Isolation of Failures: Smaller activities make it easier to isolate and handle failures. If a failure occurs, only the affected activity needs to be retried.
  • Easier Debugging: Granular activities make it easier to identify the exact point of failure and debug issues.
  • Better Retry Mechanisms: Since each activity is idempotent and can be retried independently, granular activities ensure that only the failed part of the process is retried, saving time and resources.

For example, if you have a process that involves fetching data from an API, processing that data, and then storing it in a database, you might define three separate activities:

  1. Fetch data from API
  2. Process the data
  3. Store the data in the database
Nested Activities

Nested Activities

When designing activities, it's important to understand how state management works, especially when activities are nested within each other. If an activity is encapsulated within another activity, only the parent activity will be considered for retries and state management. This means that the state will be saved before the nested activity, and only the parent activity will be retried in case of failure.

import mistralai.workflows as workflows

@workflows.activity()
async def parent_activity(input_data: str) -> dict:
    """Parent activity that encapsulates a nested activity."""
    # Some logic here
    nested_result = await nested_activity(input_data)
    return {"result_data": nested_result["result_data"]}

@workflows.activity()
async def nested_activity(input_data: str) -> dict:
    """Nested activity."""
    # Logic for the nested activity
    return {"result_data": "processed_data"}

In this example, if nested_activity fails, the entire parent_activity will be retried, not just the nested activity. The state will be saved before the nested activity is called.

Local Activities

Local Activities

For performance optimization, activities can run directly in the workflow worker process instead of being scheduled through the task queue. This is useful for fast operations (< 1 second) where scheduling overhead is significant.

When to Use

Good for:

  • Quick computations, validations, data transformations
  • High-volume scenarios where latency matters
  • Operations completing in under 1 second

Avoid for:

  • Long-running operations (> a few seconds)
  • External API calls or I/O-heavy tasks
  • Operations needing separate retry isolation or scaling

Basic Usage

from mistralai.workflows import run_activities_locally

@workflows.activity()
async def validate_email(email: str) -> bool:
    return "@" in email

@workflows.workflow.define(name="user-workflow")
class UserWorkflow:
    @workflows.workflow.entrypoint
    async def execute(self, email: str) -> bool:
        with run_activities_locally():
            result = await validate_email(email)
        return result

Mixed Execution

Combine local and remote activities:

@workflows.workflow.define(name="mixed-workflow")
class MixedWorkflow:
    @workflows.workflow.entrypoint
    async def execute(self, email: str) -> dict:
        # Fast lookup runs locally
        with run_activities_locally():
            domain = await quick_lookup(email)

        # Slow operation runs as regular activity
        verified = await external_api_call(email)

        return {"domain": domain, "verified": verified}

Performance Impact

Execution ModeTypical LatencyBest For
Local Activity< 10msQuick computations, lookups
Regular Activity50-200msAPI calls, complex logic

Limitations

Limitations

Bypasses standard scheduling:

  • No task queue isolation - activities share resources with workflows
  • Worker process blocking - slow local activities block the workflow worker
  • No independent scaling - cannot scale local activities separately
  • Limited failure isolation - worker crashes affect both workflows and local activities

⚠️ Warning: Local activities can cause worker crashes if they fail unexpectedly. Unlike regular activities, local activity failures can bring down the entire worker process, potentially causing data loss and requiring manual intervention to restart the workflow. Activity settings (partial support): Timeouts, retry policies, and dependency injection work the same for local activities. However, rate limiting is NOT supported for local activities.

Note: The sticky_to_worker, rate_limit, and heartbeat_timeout parameters don't apply to local activities since they already run in the workflow worker process.

@workflows.activity(
    start_to_close_timeout=timedelta(seconds=5),
    retry_policy_max_attempts=3
)
async def timed_local_activity(data: str) -> str:
    # Timeouts and retry policies apply even when running locally
    # Rate limiting is NOT supported for local activities
    return data.upper()
Sticky Worker Sessions

Sticky Worker Sessions

Sticky worker sessions route multiple activities to the same worker instance, enabling resource reuse and stateful operations across activity calls.

When to Use

Good for:

  • Sharing expensive resources (ML models, database connections, cached data)
  • Stateful data processing across multiple activity calls
  • Reducing initialization overhead for related operations

Avoid for:

  • Activities that must be highly available (session breaks if worker crashes)
  • Long-running workflows (ties up specific worker resources)
  • When worker-level state isn't needed

Basic Usage

from mistralai.workflows import run_sticky_worker_session

# Worker-level state
_loaded_model = None

@workflows.activity(sticky_to_worker=True)
async def load_model() -> None:
    global _loaded_model
    if _loaded_model is None:
        _loaded_model = load_expensive_model()

@workflows.activity(sticky_to_worker=True)
async def predict(data: dict) -> dict:
    return _loaded_model.predict(data)

@workflows.workflow.define(name="ml-inference")
class MLInferenceWorkflow:
    @workflows.workflow.entrypoint
    async def execute(self, batch: list[dict]) -> list[dict]:
        # All activities run on same worker
        async with run_sticky_worker_session():
            await load_model()  # Load once
            results = [await predict(item) for item in batch]
        return results

Session Reuse

Capture a session explicitly to reuse across multiple scopes:

@workflows.workflow.define(name="multi-batch")
class MultiBatchWorkflow:
    @workflows.workflow.entrypoint
    async def execute(self, batches: list[list[dict]]) -> list[list[dict]]:
        # Capture worker once
        session = await get_sticky_worker_session()

        all_results = []
        for batch in batches:
            async with run_sticky_worker_session(session):
                results = [await process_item(item) for item in batch]
                all_results.append(results)
        return all_results

Limitations

Session breaks on worker failure:

  • If the worker crashes or scales down, the session ends
  • Subsequent activities route to a different worker
  • Design activities to handle cold starts gracefully

In-memory state only:

  • Worker-level state is lost on worker restart or redeployment
  • Use databases or external storage for persistent state
  • Not a replacement for distributed state management

Worker resource contention:

  • Long-running sessions tie up specific worker capacity
  • Can create hot spots if many workflows target the same worker
  • Monitor worker utilization to avoid resource starvation

Note: The sticky_to_worker parameter doesn't apply to local activities since they already run in the workflow worker process.

Performance Comparison

FeatureRegular ActivitySticky SessionLocal Activity
Routing OverheadStandardStandardNone
Worker IsolationYesYesNo
Resource SharingNoYesN/A
Best ForIndependent opsResource reuseFast lookups